feat: Autosharding API for RELAY subscriptions (#1983)

This commit is contained in:
Simon-Pierre Vivier 2023-09-26 07:33:52 -04:00 committed by GitHub
parent d178105d9f
commit 1763b1efa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 905 additions and 327 deletions

View File

@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) =
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else:
asyncSpawn c.node.publish(DefaultPubsubTopic, message)
asyncSpawn c.node.publish(some(DefaultPubsubTopic), message)
# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =
@ -490,8 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if msg.contentTopic == chat.contentTopic:
chat.printReceivedMessage(msg)
let topic = DefaultPubsubTopic
node.subscribe(topic, handler)
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
if conf.rlnRelay:
info "WakuRLNRelay is enabled"

View File

@ -95,7 +95,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
await cmb.nodev2.publish(DefaultPubsubTopic, msg)
await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
@ -204,7 +204,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg)
cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
proc stop*(cmb: Chat2MatterBridge) {.async.} =
info "Stopping Chat2MatterBridge"
@ -229,8 +229,8 @@ when isMainModule:
# Install enabled API handlers:
if conf.relay:
let topicCache = relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, topicCache)
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(node, rpcServer, cache)
if conf.filter:
let messageCache = filter_api.MessageCache.init(capacity=30)

View File

@ -402,7 +402,7 @@ proc subscribeAndHandleMessages(node: WakuNode,
else:
msgPerContentTopic[msg.contentTopic] = 1
node.subscribe(pubsubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError

View File

@ -28,6 +28,21 @@ import
../../waku/node/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_api/message_cache,
../../waku/waku_api/cache_handlers,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api,
../../waku/waku_api/rest/health/handlers as rest_health_api,
../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api,
../../waku/waku_archive,
../../waku/waku_dnsdisc,
../../waku/waku_enr,
@ -41,22 +56,6 @@ import
./wakunode2_validator_signed,
./internal_config,
./external_config
import
../../waku/waku_api/message_cache,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api,
../../waku/waku_api/rest/health/handlers as rest_health_api,
../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api
logScope:
topics = "wakunode app"
@ -576,8 +575,20 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
## Relay REST API
if conf.relay:
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)
let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity)
let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)
for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
installRelayApiHandlers(server.router, app.node, cache)
## Filter REST API
if conf.filter:
@ -610,8 +621,20 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod
installDebugApiHandlers(app.node, server)
if conf.relay:
let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(app.node, server, relayMessageCache)
let cache = MessageCache[string].init(capacity=30)
let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)
for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
installRelayApiHandlers(app.node, server, cache)
if conf.filternode != "":
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)

View File

@ -99,7 +99,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(pubSubTopic, message)
await node.publish(some(pubSubTopic), message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
await sleepAsync(5000)

View File

@ -94,7 +94,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
pubsubTopic=pubsubTopic,
contentTopic=msg.contentTopic,
timestamp=msg.timestamp
node.subscribe(pubSubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
when isMainModule:
let rng = crypto.newRng()

View File

@ -54,9 +54,9 @@ procSuite "Peer Exchange":
peerExchangeHandler = handlePeerExchange
emptyHandler = ignorePeerExchange
await node1.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node2.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node3.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(peerExchangeHandler))
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler))
# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1

View File

@ -415,9 +415,9 @@ procSuite "Waku Discovery v5":
asyncSpawn node.subscriptionsListener(queue)
## Then
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
queue.emit((kind: PubsubSub, topic: shard3))
await sleepAsync(1.seconds)
@ -426,9 +426,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
queue.emit((kind: PubsubSub, topic: shard3))
await sleepAsync(1.seconds)
@ -437,9 +437,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard3))
queue.emit((kind: PubsubUnsub, topic: shard1))
queue.emit((kind: PubsubUnsub, topic: shard2))
queue.emit((kind: PubsubUnsub, topic: shard3))
await sleepAsync(1.seconds)

View File

@ -67,10 +67,10 @@ suite "WakuNode":
msg.payload == payload
completionFut.complete(true)
node2.subscribe(pubSubTopic, relayHandler)
node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
await node1.publish(pubSubTopic, message)
await node1.publish(some(pubSubTopic), message)
await sleepAsync(2000.millis)
check:

View File

@ -49,7 +49,7 @@ suite "WakuNode - Lightpush":
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
# Wait for subscription to take effect
await sleepAsync(100.millis)

View File

@ -46,8 +46,8 @@ suite "Waku Relay":
networkB = "test-network2"
## when
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
discard nodeA.subscribe(networkA, noopRawHandler())
discard nodeA.subscribe(networkB, noopRawHandler())
## Then
check:
@ -73,9 +73,9 @@ suite "Waku Relay":
networkB = "test-network2"
networkC = "test-network3"
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
nodeA.subscribe(networkC, noopRawHandler())
discard nodeA.subscribe(networkA, noopRawHandler())
discard nodeA.subscribe(networkB, noopRawHandler())
discard nodeA.subscribe(networkC, noopRawHandler())
let topics = toSeq(nodeA.subscribedTopics)
require:
@ -85,7 +85,7 @@ suite "Waku Relay":
topics.contains(networkC)
## When
nodeA.unsubscribe(networkA)
nodeA.unsubscribeAll(networkA)
## Then
check:
@ -129,14 +129,14 @@ suite "Waku Relay":
proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
srcSubsFut.complete((topic, message))
srcNode.subscribe(networkTopic, srcSubsHandler)
discard srcNode.subscribe(networkTopic, srcSubsHandler)
# Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler)
discard dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis)
@ -196,7 +196,7 @@ suite "Waku Relay":
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler)
discard dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis)

View File

@ -92,10 +92,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler)
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node1.publish(pubSubTopic, message)
await node1.publish(some(pubSubTopic), message)
## Then
check:
@ -173,14 +173,14 @@ suite "WakuNode - Relay":
# relay handler is called
completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler)
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node1.publish(pubSubTopic, message1)
await node1.publish(some(pubSubTopic), message1)
await sleepAsync(500.millis)
# message2 never gets relayed because of the validator
await node1.publish(pubSubTopic, message2)
await node1.publish(some(pubSubTopic), message2)
await sleepAsync(500.millis)
check:
@ -207,7 +207,7 @@ suite "WakuNode - Relay":
connOk == true
# Node 1 subscribes to topic
nodes[1].subscribe(DefaultPubsubTopic)
nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
await sleepAsync(500.millis)
# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
@ -254,10 +254,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)
@ -295,10 +295,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)
@ -340,10 +340,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)
check:
@ -380,10 +380,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)
check:
@ -420,10 +420,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)
@ -440,7 +440,7 @@ suite "WakuNode - Relay":
# subscribe all nodes to a topic
let topic = "topic"
for node in nodes: node.wakuRelay.subscribe(topic, nil)
for node in nodes: discard node.wakuRelay.subscribe(topic, nil)
await sleepAsync(500.millis)
# connect nodes in full mesh
@ -482,3 +482,48 @@ suite "WakuNode - Relay":
# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Unsubscribe keep the subscription if other content topics also use the shard":
## Setup
let
nodeKey = generateSecp256k1Key()
node = newTestWakuNode(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start()
await node.mountRelay()
## Given
let
shard = "/waku/2/rs/1/1"
contentTopicA = DefaultContentTopic
contentTopicB = ContentTopic("/waku/2/default-content1/proto")
contentTopicC = ContentTopic("/waku/2/default-content2/proto")
handler: WakuRelayHandler =
proc(
pubsubTopic: PubsubTopic,
message: WakuMessage
): Future[void] {.gcsafe, raises: [Defect].} =
discard pubsubTopic
discard message
assert shard == getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard"
## When
node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler))
node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler))
node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler))
## Then
node.unsubscribe((kind: ContentUnsub, topic: contentTopicB))
check node.wakuRelay.isSubscribed(shard)
node.unsubscribe((kind: ContentUnsub, topic: contentTopicA))
check node.wakuRelay.isSubscribed(shard)
node.unsubscribe((kind: ContentUnsub, topic: contentTopicC))
check not node.wakuRelay.isSubscribed(shard)
## Cleanup
await node.stop()

View File

@ -77,7 +77,7 @@ procSuite "WakuNode - RLN relay":
completionFut.complete(true)
# mount the relay handler
node3.subscribe(DefaultPubsubTopic, relayHandler)
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
# prepare the message payload
@ -91,7 +91,7 @@ procSuite "WakuNode - RLN relay":
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
## verifies the rate limit proof of the message and relays the message to node3
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(DefaultPubsubTopic, message)
await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)
@ -141,8 +141,8 @@ procSuite "WakuNode - RLN relay":
rxMessagesTopic2 = rxMessagesTopic2 + 1
# mount the relay handlers
nodes[2].subscribe(pubsubTopics[0], relayHandler)
nodes[2].subscribe(pubsubTopics[1], relayHandler)
nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[0]), some(relayHandler))
nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[1]), some(relayHandler))
await sleepAsync(1000.millis)
# generate some messages with rln proofs first. generating
@ -165,8 +165,8 @@ procSuite "WakuNode - RLN relay":
# publish 3 messages from node[0] (last 2 are spam, window is 10 secs)
# publish 3 messages from node[1] (last 2 are spam, window is 10 secs)
for msg in messages1: await nodes[0].publish(pubsubTopics[0], msg)
for msg in messages2: await nodes[1].publish(pubsubTopics[1], msg)
for msg in messages1: await nodes[0].publish(some(pubsubTopics[0]), msg)
for msg in messages2: await nodes[1].publish(some(pubsubTopics[1]), msg)
# wait for gossip to propagate
await sleepAsync(5000.millis)
@ -237,7 +237,7 @@ procSuite "WakuNode - RLN relay":
completionFut.complete(true)
# mount the relay handler
node3.subscribe(DefaultPubsubTopic, relayHandler)
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
# prepare the message payload
@ -266,7 +266,7 @@ procSuite "WakuNode - RLN relay":
## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3
## never gets called
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(DefaultPubsubTopic, message)
await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)
check:
@ -369,7 +369,7 @@ procSuite "WakuNode - RLN relay":
# mount the relay handler for node3
node3.subscribe(DefaultPubsubTopic, relayHandler)
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
## node1 publishes and relays 4 messages to node2
@ -378,10 +378,10 @@ procSuite "WakuNode - RLN relay":
## node2 should detect either of wm1 or wm2 as spam and not relay it
## node2 should relay wm3 to node3
## node2 should not relay wm4 because it has no valid rln proof
await node1.publish(DefaultPubsubTopic, wm1)
await node1.publish(DefaultPubsubTopic, wm2)
await node1.publish(DefaultPubsubTopic, wm3)
await node1.publish(DefaultPubsubTopic, wm4)
await node1.publish(some(DefaultPubsubTopic), wm1)
await node1.publish(some(DefaultPubsubTopic), wm2)
await node1.publish(some(DefaultPubsubTopic), wm3)
await node1.publish(some(DefaultPubsubTopic), wm4)
await sleepAsync(2000.millis)
let
@ -471,14 +471,14 @@ procSuite "WakuNode - RLN relay":
completionFut3.complete(true)
# mount the relay handler for node2
node2.subscribe(DefaultPubsubTopic, relayHandler)
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
await node1.publish(DefaultPubsubTopic, wm1)
await node1.publish(some(DefaultPubsubTopic), wm1)
await sleepAsync(10.seconds)
await node1.publish(DefaultPubsubTopic, wm2)
await node1.publish(some(DefaultPubsubTopic), wm2)
await sleepAsync(10.seconds)
await node1.publish(DefaultPubsubTopic, wm3)
await node1.publish(some(DefaultPubsubTopic), wm3)
let
res1 = await completionFut1.withTimeout(10.seconds)

View File

@ -61,7 +61,7 @@ suite "WakuNode2 - Validators":
msgReceived += 1
# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)
# Each node publishes 10 signed messages
@ -74,7 +74,7 @@ suite "WakuNode2 - Validators":
# Include signature
msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[i].publish(spamProtectedTopic, msg)
await nodes[i].publish(some(spamProtectedTopic), msg)
# Wait for gossip
await sleepAsync(2.seconds)
@ -133,7 +133,7 @@ suite "WakuNode2 - Validators":
await sleepAsync(500.millis)
# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)
# Each node sends 5 messages, signed but with a non-whitelisted key (total = 25)
@ -146,7 +146,7 @@ suite "WakuNode2 - Validators":
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[i].publish(spamProtectedTopic, msg)
await nodes[i].publish(some(spamProtectedTopic), msg)
# Each node sends 5 messages that are not signed (total = 25)
for i in 0..<5:
@ -154,7 +154,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages that dont contain timestamp (total = 25)
for i in 0..<5:
@ -162,7 +162,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: 0, ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages way BEFORE than the current timestmap (total = 25)
for i in 0..<5:
@ -171,7 +171,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: beforeTimestamp, ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages way LATER than the current timestmap (total = 25)
for i in 0..<5:
@ -180,7 +180,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: afterTimestamp, ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Wait for gossip
await sleepAsync(4.seconds)
@ -227,7 +227,7 @@ suite "WakuNode2 - Validators":
msgReceived += 1
# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)
# Add signed message validator to all nodes. They will only route signed messages
@ -255,7 +255,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[0].publish(spamProtectedTopic, unsignedMessage)
await nodes[0].publish(some(spamProtectedTopic), unsignedMessage)
# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50:
@ -264,7 +264,7 @@ suite "WakuNode2 - Validators":
version: 2, timestamp: now(), ephemeral: true)
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[0].publish(spamProtectedTopic, msg)
await nodes[0].publish(some(spamProtectedTopic), msg)
# Wait for gossip
await sleepAsync(2.seconds)

View File

@ -22,10 +22,6 @@ import
../testlib/wakucore,
../testlib/wakunode
proc newTestMessageCache(): relay_api.MessageCache =
relay_api.MessageCache.init(capacity=30)
suite "Waku v2 JSON-RPC API - Relay":
asyncTest "subscribe and unsubscribe from topics":
@ -33,7 +29,7 @@ suite "Waku v2 JSON-RPC API - Relay":
let node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start()
await node.mountRelay(topics = @[DefaultPubsubTopic])
await node.mountRelay(@[])
# JSON-RPC server
let
@ -41,7 +37,8 @@ suite "Waku v2 JSON-RPC API - Relay":
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
installRelayApiHandlers(node, server, newTestMessageCache())
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(node, server, cache)
server.start()
# JSON-RPC client
@ -67,16 +64,14 @@ suite "Waku v2 JSON-RPC API - Relay":
subResp == true
check:
# Node is now subscribed to default + new topics
subTopics.len == 1 + newTopics.len
DefaultPubsubTopic in subTopics
subTopics.len == newTopics.len
newTopics.allIt(it in subTopics)
check:
unsubResp == true
check:
# Node is now unsubscribed from new topics
unsubTopics.len == 1
DefaultPubsubTopic in unsubTopics
unsubTopics.len == 0
newTopics.allIt(it notin unsubTopics)
await server.stop()
@ -110,14 +105,14 @@ suite "Waku v2 JSON-RPC API - Relay":
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
# RPC server (source node)
let
rpcPort = Port(8548)
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
installRelayApiHandlers(srcNode, server, newTestMessageCache())
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(srcNode, server, cache)
server.start()
# JSON-RPC client
@ -131,7 +126,7 @@ suite "Waku v2 JSON-RPC API - Relay":
proc dstHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
dstHandlerFut.complete((topic, msg))
dstNode.subscribe(pubSubTopic, dstHandler)
dstNode.subscribe((kind: PubsubSub, topic: pubsubTopic), some(dstHandler))
## When
let rpcMessage = WakuMessageRPC(
@ -162,7 +157,7 @@ suite "Waku v2 JSON-RPC API - Relay":
await server.closeWait()
await allFutures(srcNode.stop(), dstNode.stop())
asyncTest "get latest messages received from topics cache":
asyncTest "get latest messages received from pubsub topics cache":
## Setup
let
pubSubTopic = "test-jsonrpc-pubsub-topic"
@ -176,24 +171,26 @@ suite "Waku v2 JSON-RPC API - Relay":
await allFutures(srcNode.start(), dstNode.start())
await srcNode.mountRelay(@[pubSubTopic])
await dstNode.mountRelay(@[pubSubTopic])
await dstNode.mountRelay(@[])
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
# RPC server (destination node)
let
rpcPort = Port(8549)
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
installRelayApiHandlers(dstNode, server, newTestMessageCache())
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(dstNode, server, cache)
server.start()
# JSON-RPC client
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
discard await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
## Given
let messages = @[
fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic),
@ -204,7 +201,7 @@ suite "Waku v2 JSON-RPC API - Relay":
## When
for msg in messages:
await srcNode.publish(pubSubTopic, msg)
await srcNode.publish(some(pubSubTopic), msg)
await sleepAsync(200.millis)
@ -222,3 +219,66 @@ suite "Waku v2 JSON-RPC API - Relay":
await server.stop()
await server.closeWait()
await allFutures(srcNode.stop(), dstNode.stop())
asyncTest "get latest messages received from content topics cache":
## Setup
let contentTopic = DefaultContentTopic
# Relay nodes setup
let
srcNode = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
dstNode = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(srcNode.start(), dstNode.start())
let shard = getShard(contentTopic).expect("Valid Shard")
await srcNode.mountRelay(@[shard])
await dstNode.mountRelay(@[])
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
# RPC server (destination node)
let
rpcPort = Port(8550)
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
server = newRpcHttpServer([ta])
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(dstNode, server, cache)
server.start()
# JSON-RPC client
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
discard await client.post_waku_v2_relay_v1_auto_subscriptions(@[contentTopic])
## Given
let messages = @[
fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic),
fakeWakuMessage(payload= @[byte 71], contentTopic=contentTopic),
fakeWakuMessage(payload= @[byte 72], contentTopic=contentTopic),
fakeWakuMessage(payload= @[byte 73], contentTopic=contentTopic)
]
## When
for msg in messages:
await srcNode.publish(none(PubsubTopic), msg)
await sleepAsync(200.millis)
let dstMessages = await client.get_waku_v2_relay_v1_auto_messages(contentTopic)
## Then
check:
dstMessages.len == 4
dstMessages[2].payload == base64.encode(messages[2].payload)
dstMessages[2].contentTopic.get() == messages[2].contentTopic
dstMessages[2].timestamp.get() == messages[2].timestamp
dstMessages[2].version.get() == messages[2].version
## Cleanup
await server.stop()
await server.closeWait()
await allFutures(srcNode.stop(), dstNode.stop())

View File

@ -23,7 +23,6 @@ import
../../waku/waku_relay,
../../waku/waku_filter_v2/subscriptions,
../../waku/waku_filter_v2/common,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_api/rest/relay/handlers as relay_api,
../../waku/waku_api/rest/relay/client as relay_api_client,
../testlib/wakucore,
@ -74,7 +73,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
testSetup.messageCache = filter_api.MessageCache.init()
installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache)
let topicCache = TopicCache.init()
let topicCache = MessageCache[string].init()
installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache)
testSetup.restServer.start()
@ -244,7 +243,7 @@ suite "Waku v2 Rest API - Filter V2":
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.messageCache.subscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
# When
var requestBody = FilterSubscribeRequest(requestId: "1234",

View File

@ -94,8 +94,8 @@ suite "Waku v2 Rest API - lightpush":
# Given
let restLightPushTest = await RestLightPushTest.init()
restLightPushTest.consumerNode.subscribe(DefaultPubsubTopic)
restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic)
restLightPushTest.consumerNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
restLightPushTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
@ -120,7 +120,7 @@ suite "Waku v2 Rest API - lightpush":
# Given
let restLightPushTest = await RestLightPushTest.init()
restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic)
restLightPushTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

View File

@ -11,13 +11,13 @@ import
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/waku_api/message_cache,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/responses,
../../waku/waku_api/rest/relay/types,
../../waku/waku_api/rest/relay/handlers as relay_api,
../../waku/waku_api/rest/relay/client as relay_api_client,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_relay,
../../../waku/waku_rln_relay,
../testlib/wakucore,
@ -34,7 +34,7 @@ proc testWakuNode(): WakuNode =
suite "Waku v2 Rest API - Relay":
asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions":
asyncTest "Subscribe a node to an array of pubsub topics - POST /relay/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
@ -44,9 +44,9 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let topicCache = TopicCache.init()
let cache = MessageCache[string].init()
installRelayPostSubscriptionsV1Handler(restServer.router, node, topicCache)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let pubSubTopics = @[
@ -67,35 +67,39 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
topicCache.isSubscribed("pubsub-topic-1")
topicCache.isSubscribed("pubsub-topic-2")
topicCache.isSubscribed("pubsub-topic-3")
cache.isSubscribed("pubsub-topic-1")
cache.isSubscribed("pubsub-topic-2")
cache.isSubscribed("pubsub-topic-3")
check:
# Node should be subscribed to default + new topics
toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions":
asyncTest "Unsubscribe a node from an array of pubsub topics - DELETE /relay/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
await node.mountRelay(@[
"pubsub-topic-1",
"pubsub-topic-2",
"pubsub-topic-3",
"pubsub-topic-x",
])
let restPort = Port(58012)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let topicCache = TopicCache.init()
topicCache.subscribe("pubsub-topic-1")
topicCache.subscribe("pubsub-topic-2")
topicCache.subscribe("pubsub-topic-3")
topicCache.subscribe("pubsub-topic-x")
let cache = MessageCache[string].init()
cache.subscribe("pubsub-topic-1")
cache.subscribe("pubsub-topic-2")
cache.subscribe("pubsub-topic-3")
cache.subscribe("pubsub-topic-x")
installRelayDeleteSubscriptionsV1Handler(restServer.router, node, topicCache)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let pubSubTopics = @[
@ -117,17 +121,22 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
not topicCache.isSubscribed("pubsub-topic-1")
not topicCache.isSubscribed("pubsub-topic-2")
not topicCache.isSubscribed("pubsub-topic-3")
topicCache.isSubscribed("pubsub-topic-x")
not cache.isSubscribed("pubsub-topic-1")
not node.wakuRelay.isSubscribed("pubsub-topic-1")
not cache.isSubscribed("pubsub-topic-2")
not node.wakuRelay.isSubscribed("pubsub-topic-2")
not cache.isSubscribed("pubsub-topic-3")
not node.wakuRelay.isSubscribed("pubsub-topic-3")
cache.isSubscribed("pubsub-topic-x")
node.wakuRelay.isSubscribed("pubsub-topic-x")
not cache.isSubscribed("pubsub-topic-y")
not node.wakuRelay.isSubscribed("pubsub-topic-y")
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}":
asyncTest "Get the latest messages for a pubsub topic - GET /relay/v1/messages/{topic}":
# Given
let node = testWakuNode()
await node.start()
@ -144,13 +153,13 @@ suite "Waku v2 Rest API - Relay":
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]
let topicCache = TopicCache.init()
let cache = MessageCache[string].init()
topicCache.subscribe(pubSubTopic)
cache.subscribe(pubSubTopic)
for msg in messages:
topicCache.addMessage(pubSubTopic, msg)
cache.addMessage(pubSubTopic, msg)
installRelayGetMessagesV1Handler(restServer.router, node, topicCache)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
# When
@ -164,20 +173,20 @@ suite "Waku v2 Rest API - Relay":
response.data.len == 3
response.data.all do (msg: RelayWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.contentTopic.get() == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
check:
topicCache.isSubscribed(pubSubTopic)
topicCache.getMessages(pubSubTopic).tryGet().len == 0
cache.isSubscribed(pubSubTopic)
cache.getMessages(pubSubTopic).tryGet().len == 0
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}":
asyncTest "Post a message to a pubsub topic - POST /relay/v1/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe":
# Given
let node = testWakuNode()
@ -192,26 +201,18 @@ suite "Waku v2 Rest API - Relay":
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let topicCache = TopicCache.init()
let cache = MessageCache[string].init()
installRelayApiHandlers(restServer.router, node, topicCache)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe(DefaultPubsubTopic)
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
# When
let newTopics = @[
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3")
]
discard await client.relayPostSubscriptionsV1(newTopics)
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
payload: base64.encode("TEST-PAYLOAD"),
contentTopic: some(DefaultContentTopic),
@ -224,8 +225,193 @@ suite "Waku v2 Rest API - Relay":
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
# TODO: Check for the message to be published to the topic
await restServer.stop()
await restServer.closeWait()
await node.stop()
# Autosharding API
asyncTest "Subscribe a node to an array of content topics - POST /relay/v1/auto/subscriptions":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let contentTopics = @[
ContentTopic("/waku/2/default-content1/proto"),
ContentTopic("/waku/2/default-content2/proto"),
ContentTopic("/waku/2/default-content3/proto")
]
let shards = contentTopics.mapIt(getShard(it).expect("Valid Shard")).deduplicate()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let requestBody = RelayPostSubscriptionsRequest(contentTopics)
let response = await client.relayPostAutoSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
cache.isSubscribed(contentTopics[0])
cache.isSubscribed(contentTopics[1])
cache.isSubscribed(contentTopics[2])
check:
# Node should be subscribed to all shards
toSeq(node.wakuRelay.subscribedTopics).len == shards.len
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Unsubscribe a node from an array of content topics - DELETE /relay/v1/auto/subscriptions":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58012)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let contentTopics = @[
ContentTopic("/waku/2/default-content1/proto"),
ContentTopic("/waku/2/default-content2/proto"),
ContentTopic("/waku/2/default-content3/proto"),
ContentTopic("/waku/2/default-contentX/proto")
]
let cache = MessageCache[string].init()
cache.subscribe(contentTopics[0])
cache.subscribe(contentTopics[1])
cache.subscribe(contentTopics[2])
cache.subscribe("/waku/2/default-contentY/proto")
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let requestBody = RelayDeleteSubscriptionsRequest(contentTopics)
let response = await client.relayDeleteAutoSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not cache.isSubscribed(contentTopics[1])
not cache.isSubscribed(contentTopics[2])
not cache.isSubscribed(contentTopics[3])
cache.isSubscribed("/waku/2/default-contentY/proto")
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Get the latest messages for a content topic - GET /relay/v1/auto/messages/{topic}":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58013)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let contentTopic = DefaultContentTopic
let messages = @[
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")),
]
let cache = MessageCache[string].init()
cache.subscribe(contentTopic)
for msg in messages:
cache.addMessage(contentTopic, msg)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.relayGetAutoMessagesV1(contentTopic)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: RelayWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get() == DefaultContentTopic and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
check:
cache.isSubscribed(contentTopic)
cache.getMessages(contentTopic).tryGet().len == 0 # The cache is cleared when getMessage is called
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Post a message to a content topic - POST /relay/v1/auto/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))
# RPC server setup
let restPort = Port(58014)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
let cache = MessageCache[string].init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: ContentSub, topic: DefaultContentTopic))
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
# When
let response = await client.relayPostAutoMessagesV1(DefaultContentTopic, RelayWakuMessage(
payload: base64.encode("TEST-PAYLOAD"),
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022))
))
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
await restServer.stop()
await restServer.closeWait()
await node.stop()

View File

@ -25,6 +25,7 @@ import
libp2p/transports/wstransport
import
../waku_core,
../waku_core/topics/sharding,
../waku_relay,
../waku_archive,
../waku_store,
@ -101,6 +102,7 @@ type
announcedAddresses* : seq[MultiAddress]
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
contentTopicHandlers: Table[ContentTopic, TopicHandler]
proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
@ -220,62 +222,104 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
node.wakuRelay.subscribe(topic, defaultHandler)
discard node.wakuRelay.subscribe(topic, defaultHandler)
proc subscribe*(node: WakuNode, topic: PubsubTopic) =
proc subscribe*(node: WakuNode, subscription: SubscriptionEvent, handler = none(WakuRelayHandler)) =
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
if node.wakuRelay.isNil():
error "Invalid API call to `subscribe`. WakuRelay not mounted."
return
debug "subscribe", pubsubTopic= topic
let (pubsubTopic, contentTopicOp) =
case subscription.kind:
of ContentSub:
let shard = getShard((subscription.topic)).valueOr:
error "Autosharding error", error=error
return
node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic))
node.registerRelayDefaultHandler(topic)
proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
if node.wakuRelay.isNil():
error "Invalid API call to `subscribe`. WakuRelay not mounted."
(shard, some(subscription.topic))
of PubsubSub: (subscription.topic, none(ContentTopic))
else: return
if contentTopicOp.isSome() and node.contentTopicHandlers.hasKey(contentTopicOp.get()):
error "Invalid API call to `subscribe`. Was already subscribed"
return
debug "subscribe", pubsubTopic= topic
debug "subscribe", pubsubTopic=pubsubTopic
node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic))
node.registerRelayDefaultHandler(topic)
node.wakuRelay.subscribe(topic, handler)
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
node.registerRelayDefaultHandler(pubsubTopic)
proc unsubscribe*(node: WakuNode, topic: PubsubTopic) =
## Unsubscribes from a specific PubSub topic.
if handler.isSome():
let wrappedHandler = node.wakuRelay.subscribe(pubsubTopic, handler.get())
if contentTopicOp.isSome():
node.contentTopicHandlers[contentTopicOp.get()] = wrappedHandler
proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) =
## Unsubscribes from a specific PubSub or Content topic.
if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return
info "unsubscribe", pubsubTopic=topic
let (pubsubTopic, contentTopicOp) =
case subscription.kind:
of ContentUnsub:
let shard = getShard((subscription.topic)).valueOr:
error "Autosharding error", error=error
return
node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: topic))
node.wakuRelay.unsubscribe(topic)
(shard, some(subscription.topic))
of PubsubUnsub: (subscription.topic, none(ContentTopic))
else: return
if not node.wakuRelay.isSubscribed(pubsubTopic):
error "Invalid API call to `unsubscribe`. Was not subscribed"
return
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
if contentTopicOp.isSome():
# Remove this handler only
var handler: TopicHandler
if node.contentTopicHandlers.pop(contentTopicOp.get(), handler):
debug "unsubscribe", contentTopic=contentTopicOp.get()
node.wakuRelay.unsubscribe(pubsubTopic, handler)
if contentTopicOp.isNone() or node.wakuRelay.topics.getOrDefault(pubsubTopic).len == 1:
# Remove all handlers
debug "unsubscribe", pubsubTopic=pubsubTopic
node.wakuRelay.unsubscribeAll(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
proc publish*(
node: WakuNode,
pubsubTopicOp: Option[PubsubTopic],
message: WakuMessage
) {.async, gcsafe.} =
## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard.
## `WakuMessage` should contain a `contentTopic` field for light node functionality.
## It is also used to determine the shard.
if node.wakuRelay.isNil():
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
# TODO: Improve error handling
return
discard await node.wakuRelay.publish(topic, message)
let pubsubTopic = pubsubTopicOp.valueOr:
getShard(message.contentTopic).valueOr:
error "Autosharding error", error=error
return
#TODO instead of discard return error when 0 peers received the message
discard await node.wakuRelay.publish(pubsubTopic, message)
trace "waku.relay published",
peerId=node.peerId,
pubsubTopic=topic,
hash=topic.digest(message).to0xHex(),
publishTime=getNowInNanosecondTime()
peerId=node.peerId,
pubsubTopic=pubsubTopic,
hash=pubsubTopic.digest(message).to0xHex(),
publishTime=getNowInNanosecondTime()
proc startRelay*(node: WakuNode) {.async.} =
## Setup and start relay protocol
@ -303,7 +347,7 @@ proc startRelay*(node: WakuNode) {.async.} =
info "relay started successfully"
proc mountRelay*(node: WakuNode,
topics: seq[string] = @[],
pubsubTopics: seq[string] = @[],
peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} =
if not node.wakuRelay.isNil():
error "wakuRelay already mounted, skipping"
@ -332,8 +376,8 @@ proc mountRelay*(node: WakuNode,
info "relay mounted successfully"
# Subscribe to topics
for topic in topics:
node.subscribe(topic)
for pubsubTopic in pubsubTopics:
node.subscribe((kind: PubsubSub, topic: pubsubTopic))
## Waku filter

View File

@ -0,0 +1,23 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
chronicles
import
../waku_relay,
../waku_core,
./message_cache
##### Message handler
proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
if cache.isSubscribed(msg.contentTopic):
cache.addMessage(msg.contentTopic, msg)

View File

@ -4,6 +4,11 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[PubsubTopic]): bool
proc post_waku_v2_relay_v1_message(topic: PubsubTopic, message: WakuMessageRPC): bool
proc get_waku_v2_relay_v1_messages(topic: PubsubTopic): seq[WakuMessageRPC]
proc post_waku_v2_relay_v1_auto_subscriptions(topics: seq[ContentTopic]): bool
proc delete_waku_v2_relay_v1_auto_subscriptions(topics: seq[ContentTopic]): bool
proc post_waku_v2_relay_v1_auto_message(message: WakuMessageRPC): bool
proc get_waku_v2_relay_v1_auto_messages(topic: ContentTopic): seq[WakuMessageRPC]
# Support for the Relay Private API has been deprecated.
# This API existed for compatibility with the Waku v1 spec and encryption scheme.

View File

@ -17,6 +17,7 @@ import
../../../waku_rln_relay/rln/wrappers,
../../../waku_node,
../../message_cache,
../../cache_handlers,
../message
from std/times import getTime
@ -29,54 +30,51 @@ logScope:
const futTimeout* = 5.seconds # Max time to wait for futures
type
MessageCache* = message_cache.MessageCache[PubsubTopic]
## Waku Relay JSON-RPC API
proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) =
if node.wakuRelay.isNil():
debug "waku relay protocol is nil. skipping json rpc api handlers installation"
return
let topicHandler = proc(topic: PubsubTopic, message: WakuMessage) {.async.} =
cache.addMessage(topic, message)
# The node may already be subscribed to some topics when Relay API handlers
# are installed
for topic in node.wakuRelay.subscribedTopics:
node.subscribe(topic, topicHandler)
cache.subscribe(topic)
server.rpc("post_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool:
proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache[string]) =
server.rpc("post_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool:
if pubsubTopics.len == 0:
raise newException(ValueError, "No pubsub topic provided")
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
# Subscribe to all requested topics
let newTopics = topics.filterIt(not cache.isSubscribed(it))
let newTopics = pubsubTopics.filterIt(not cache.isSubscribed(it))
for topic in newTopics:
cache.subscribe(topic)
node.subscribe(topic, topicHandler)
for pubsubTopic in newTopics:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
cache.subscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)))
return true
server.rpc("delete_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool:
server.rpc("delete_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool:
if pubsubTopics.len == 0:
raise newException(ValueError, "No pubsub topic provided")
## Unsubscribes a node from a list of PubSub topics
debug "delete_waku_v2_relay_v1_subscriptions"
# Unsubscribe all handlers from requested topics
let subscribedTopics = topics.filterIt(cache.isSubscribed(it))
let subscribedTopics = pubsubTopics.filterIt(cache.isSubscribed(it))
for topic in subscribedTopics:
node.unsubscribe(topic)
cache.unsubscribe(topic)
for pubsubTopic in subscribedTopics:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
cache.unsubscribe(pubsubTopic)
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
return true
server.rpc("post_waku_v2_relay_v1_message") do (pubsubTopic: PubsubTopic, msg: WakuMessageRPC) -> bool:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
## Publishes a WakuMessage to a PubSub topic
debug "post_waku_v2_relay_v1_message", pubsubTopic=pubsubTopic
@ -84,10 +82,12 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
if payloadRes.isErr():
raise newException(ValueError, "invalid payload format: " & payloadRes.error)
if msg.contentTopic.isNone():
raise newException(ValueError, "message has no content topic")
var message = WakuMessage(
payload: payloadRes.value,
# TODO: Fail if the message doesn't have a content topic
contentTopic: msg.contentTopic.get(DefaultContentTopic),
contentTopic: msg.contentTopic.get(),
version: msg.version.get(0'u32),
timestamp: msg.timestamp.get(Timestamp(0)),
ephemeral: msg.ephemeral.get(false)
@ -96,7 +96,8 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
# ensure the node is subscribed to the pubsubTopic. otherwise it risks publishing
# to a topic with no connected peers
if pubsubTopic notin node.wakuRelay.subscribedTopics():
raise newException(ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic)
raise newException(
ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic)
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
@ -112,34 +113,127 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
elif result == MessageValidationResult.Spam:
raise newException(ValueError, "Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", pubSubTopic=pubSubTopic
debug "RLN proof validated successfully", pubSubTopic=pubsubTopic
else:
raise newException(ValueError, "Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", pubSubTopic=pubSubTopic
let publishFut = node.publish(pubsubTopic, message)
debug "Publishing message", pubSubTopic=pubsubTopic, rln=defined(rln)
let publishFut = node.publish(some(pubsubTopic), message)
if not await publishFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to publish: timed out")
return true
server.rpc("get_waku_v2_relay_v1_messages") do (topic: PubsubTopic) -> seq[WakuMessageRPC]:
server.rpc("get_waku_v2_relay_v1_messages") do (pubsubTopic: PubsubTopic) -> seq[WakuMessageRPC]:
if pubsubTopic == "":
raise newException(ValueError, "Empty pubsub topic")
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called
debug "get_waku_v2_relay_v1_messages", topic=topic
debug "get_waku_v2_relay_v1_messages", topic=pubsubTopic
if not cache.isSubscribed(topic):
raise newException(ValueError, "Not subscribed to topic: " & topic)
let msgRes = cache.getMessages(topic, clear=true)
let msgRes = cache.getMessages(pubsubTopic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to topic: " & topic)
raise newException(ValueError, "Not subscribed to pubsub topic: " & pubsubTopic)
return msgRes.value.map(toWakuMessageRPC)
# Autosharding API
## Waku Relay Private JSON-RPC API (Whisper/Waku v1 compatibility)
## Support for the Relay Private API has been deprecated.
## This API existed for compatibility with the Waku v1/Whisper spec and encryption schemes.
## It is recommended to use the Relay API instead.
server.rpc("post_waku_v2_relay_v1_auto_subscriptions") do (contentTopics: seq[ContentTopic]) -> bool:
if contentTopics.len == 0:
raise newException(ValueError, "No content topic provided")
## Subscribes a node to a list of Content topics
debug "post_waku_v2_relay_v1_auto_subscriptions"
let newTopics = contentTopics.filterIt(not cache.isSubscribed(it))
# Subscribe to all requested topics
for contentTopic in newTopics:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
cache.subscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache)))
return true
server.rpc("delete_waku_v2_relay_v1_auto_subscriptions") do (contentTopics: seq[ContentTopic]) -> bool:
if contentTopics.len == 0:
raise newException(ValueError, "No content topic provided")
## Unsubscribes a node from a list of Content topics
debug "delete_waku_v2_relay_v1_auto_subscriptions"
let subscribedTopics = contentTopics.filterIt(cache.isSubscribed(it))
# Unsubscribe all handlers from requested topics
for contentTopic in subscribedTopics:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
cache.unsubscribe(contentTopic)
node.unsubscribe((kind: ContentUnsub, topic: contentTopic))
return true
server.rpc("post_waku_v2_relay_v1_auto_message") do (msg: WakuMessageRPC) -> bool:
## Publishes a WakuMessage to a Content topic
debug "post_waku_v2_relay_v1_auto_message"
let payloadRes = base64.decode(msg.payload)
if payloadRes.isErr():
raise newException(ValueError, "invalid payload format: " & payloadRes.error)
if msg.contentTopic.isNone():
raise newException(ValueError, "message has no content topic")
var message = WakuMessage(
payload: payloadRes.value,
contentTopic: msg.contentTopic.get(),
version: msg.version.get(0'u32),
timestamp: msg.timestamp.get(Timestamp(0)),
ephemeral: msg.ephemeral.get(false)
)
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
# append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix()))
if not success:
raise newException(ValueError, "Failed to publish: error appending RLN proof to message")
# validate the message before sending it
let result = node.wakuRlnRelay.validateMessage(message)
if result == MessageValidationResult.Invalid:
raise newException(ValueError, "Failed to publish: invalid RLN proof")
elif result == MessageValidationResult.Spam:
raise newException(ValueError, "Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", contentTopic=message.contentTopic
else:
raise newException(ValueError, "Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", contentTopic=message.contentTopic, rln=defined(rln)
let publishFut = node.publish(none(PubsubTopic), message)
if not await publishFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to publish: timed out")
return true
server.rpc("get_waku_v2_relay_v1_auto_messages") do (contentTopic: ContentTopic) -> seq[WakuMessageRPC]:
if contentTopic == "":
raise newException(ValueError, "Empty content topic")
## Returns all WakuMessages received on a Content topic since the
## last time this method was called
debug "get_waku_v2_relay_v1_auto_messages", topic=contentTopic
let msgRes = cache.getMessages(contentTopic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to content topic: " & contentTopic)
return msgRes.value.map(toWakuMessageRPC)

View File

@ -46,10 +46,11 @@ proc decodeBytes*(t: typedesc[string], value: openarray[byte],
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.}
proc relayPostAutoSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc relayDeleteAutoSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[RelayGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
@ -70,6 +71,8 @@ proc encodeBytes*(value: RelayPostMessagesRequest,
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.}
proc relayGetAutoMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/auto/messages/{topic}", meth: HttpMethod.MethodGet.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.}
proc relayPostAutoMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/messages/{topic}", meth: HttpMethod.MethodPost.}

View File

@ -16,10 +16,12 @@ import
../../../waku_relay/protocol,
../../../waku_rln_relay,
../../../waku_rln_relay/rln/wrappers,
../../../node/waku_node,
../../message_cache,
../../cache_handlers,
../serdes,
../responses,
./types,
./topic_cache
./types
from std/times import getTime
from std/times import toUnix
@ -40,9 +42,11 @@ const futTimeout* = 5.seconds # Max time to wait for futures
#### Request handlers
const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions"
const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}"
const ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1* = "/relay/v1/auto/subscriptions"
const ROUTE_RELAY_AUTO_MESSAGESV1* = "/relay/v1/auto/messages/{topic}"
proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache[string]) =
router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics
# debug "post_waku_v2_relay_v1_subscriptions"
@ -65,14 +69,12 @@ proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuN
# Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it))
for topic in newTopics:
cache.subscribe(topic)
node.subscribe(topic, cache.messageHandler())
for pubsubTopic in newTopics:
cache.subscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)))
return RestApiResponse.ok()
proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics
# debug "delete_waku_v2_relay_v1_subscriptions"
@ -93,17 +95,13 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak
let req: RelayDeleteSubscriptionsRequest = reqResult.get()
# Unsubscribe all handlers from requested topics
for topic in req:
node.unsubscribe(topic)
cache.unsubscribe(topic)
for pubsubTopic in req:
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
cache.unsubscribe(pubsubTopic)
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}"
proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse:
# ## Returns all WakuMessages received on a PubSub topic since the
# ## last time this method was called
@ -127,9 +125,7 @@ proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, c
return resp.get()
proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) =
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
if topic.isErr():
return RestApiResponse.badRequest()
let pubSubTopic = topic.get()
@ -178,16 +174,139 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode)
return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", pubSubTopic=pubSubTopic
if not (waitFor node.publish(pubSubTopic, resMessage.value).withTimeout(futTimeout)):
debug "Publishing message", pubSubTopic=pubSubTopic, rln=defined(rln)
if not (waitFor node.publish(some(pubSubTopic), resMessage.value).withTimeout(futTimeout)):
error "Failed to publish message to topic", pubSubTopic=pubSubTopic
return RestApiResponse.internalServerError("Failed to publish: timedout")
return RestApiResponse.ok()
# Autosharding API
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
installRelayPostSubscriptionsV1Handler(router, node, cache)
installRelayDeleteSubscriptionsV1Handler(router, node, cache)
installRelayGetMessagesV1Handler(router, node, cache)
installRelayPostMessagesV1Handler(router, node)
router.api(MethodPost, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of content topics
# debug "post_waku_v2_relay_v1_auto_subscriptions"
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayPostSubscriptionsRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
let req: RelayPostSubscriptionsRequest = reqResult.get()
# Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it))
for contentTopic in newTopics:
cache.subscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache)))
return RestApiResponse.ok()
router.api(MethodDelete, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of content topics
# debug "delete_waku_v2_relay_v1_auto_subscriptions"
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayDeleteSubscriptionsRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
let req: RelayDeleteSubscriptionsRequest = reqResult.get()
# Unsubscribe all handlers from requested topics
for contentTopic in req:
cache.unsubscribe(contentTopic)
node.unsubscribe((kind: ContentUnsub, topic: contentTopic))
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
router.api(MethodGet, ROUTE_RELAY_AUTO_MESSAGESV1) do (topic: string) -> RestApiResponse:
# ## Returns all WakuMessages received on a content topic since the
# ## last time this method was called
# ## TODO: ability to specify a return message limit
# debug "get_waku_v2_relay_v1_auto_messages", topic=topic
if topic.isErr():
return RestApiResponse.badRequest()
let contentTopic = topic.get()
let messages = cache.getMessages(contentTopic, clear=true)
if messages.isErr():
debug "Not subscribed to topic", topic=contentTopic
return RestApiResponse.notFound()
let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
debug "An error ocurred while building the json respose", error=resp.error
return RestApiResponse.internalServerError()
return resp.get()
router.api(MethodPost, ROUTE_RELAY_AUTO_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayPostMessagesRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
if reqResult.value.contentTopic.isNone():
return RestApiResponse.badRequest()
let resMessage = reqResult.value.toWakuMessage(version = 0)
if resMessage.isErr():
return RestApiResponse.badRequest()
var message = resMessage.get()
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
# append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix()))
if not success:
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")
# validate the message before sending it
let result = node.wakuRlnRelay.validateMessage(message)
if result == MessageValidationResult.Invalid:
return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof")
elif result == MessageValidationResult.Spam:
return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", contentTopic=message.contentTopic
else:
return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result")
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", contentTopic=message.contentTopic, rln=defined(rln)
if not (waitFor node.publish(none(PubSubTopic), message).withTimeout(futTimeout)):
error "Failed to publish message to topic", contentTopic=message.contentTopic
return RestApiResponse.internalServerError("Failed to publish: timedout")
return RestApiResponse.ok()

View File

@ -1,33 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
chronicles
import
../../../waku_relay,
../../../waku_core,
../../message_cache
export message_cache
##### TopicCache
type TopicCacheResult*[T] = MessageCacheResult[T]
type TopicCache* = MessageCache[PubSubTopic]
##### Message handler
type TopicCacheMessageHandler* = WakuRelayHandler
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
let handler = proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
handler

View File

@ -29,8 +29,8 @@ type
RelayPostMessagesRequest* = RelayWakuMessage
type
RelayPostSubscriptionsRequest* = seq[PubSubTopic]
RelayDeleteSubscriptionsRequest* = seq[PubSubTopic]
RelayPostSubscriptionsRequest* = seq[string]
RelayDeleteSubscriptionsRequest* = seq[string]
#### Type conversion

View File

@ -10,9 +10,5 @@ export
type
SubscriptionKind* = enum ContentSub, ContentUnsub, PubsubSub, PubsubUnsub
SubscriptionEvent* = object
case kind*: SubscriptionKind
of PubsubSub: pubsubSub*: string
of ContentSub: contentSub*: string
of PubsubUnsub: pubsubUnsub*: string
of ContentUnsub: contentUnsub*: string
SubscriptionEvent* = tuple[kind: SubscriptionKind, topic: string]

View File

@ -307,7 +307,10 @@ proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
debug "Successfully stopped discovery v5 service"
proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]) {.async.} =
proc subscriptionsListener*(
wd: WakuDiscoveryV5,
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
) {.async.} =
## Listen for pubsub topics subscriptions changes
let key = topicSubscriptionQueue.register()
@ -317,8 +320,8 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv
# Since we don't know the events we will receive we have to anticipate.
let subs = events.filterIt(it.kind == SubscriptionKind.PubsubSub).mapIt(it.pubsubSub)
let unsubs = events.filterIt(it.kind == SubscriptionKind.PubsubUnsub).mapIt(it.pubsubUnsub)
let subs = events.filterIt(it.kind == PubsubSub).mapIt(it.topic)
let unsubs = events.filterIt(it.kind == PubsubUnsub).mapIt(it.topic)
if subs.len == 0 and unsubs.len == 0:
continue

View File

@ -214,20 +214,21 @@ proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} =
return ValidationResult.Accept
return wrappedValidator
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler =
debug "subscribe", pubsubTopic=pubsubTopic
# we need to wrap the handler since gossipsub doesnt understand WakuMessage
let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
let decMsg = WakuMessage.decode(data)
if decMsg.isErr():
# fine if triggerSelf enabled, since validators are bypassed
error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error
let fut = newFuture[void]()
fut.complete()
return fut
else:
return handler(pubsubTopic, decMsg.get())
let wrappedHandler =
proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
let decMsg = WakuMessage.decode(data)
if decMsg.isErr():
# fine if triggerSelf enabled, since validators are bypassed
error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error
let fut = newFuture[void]()
fut.complete()
return fut
else:
return handler(pubsubTopic, decMsg.get())
# add the ordered validator to the topic
if not w.validatorInserted.hasKey(pubSubTopic):
@ -240,12 +241,23 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
# subscribe to the topic with our wrapped handler
procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
debug "unsubscribe", pubsubTopic=pubsubTopic
return wrappedHandler
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
## Unsubscribe all handlers on this pubsub topic
debug "unsubscribe all", pubsubTopic=pubsubTopic
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.validatorInserted.del(pubsubTopic)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) =
## Unsubscribe this handler on this pubsub topic
debug "unsubscribe", pubsubTopic=pubsubTopic
procCall GossipSub(w).unsubscribe(pubsubTopic, handler)
proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} =
trace "publish", pubsubTopic=pubsubTopic
let data = message.encode().buffer