diff --git a/apps/wakucanary/README.md b/apps/wakucanary/README.md index 2f8e5275e..6ae4ca3e9 100644 --- a/apps/wakucanary/README.md +++ b/apps/wakucanary/README.md @@ -15,9 +15,11 @@ The following options are available: -p, --protocol Protocol required to be supported: store,relay,lightpush,filter (can be used multiple times). -l, --log-level Sets the log level [=LogLevel.DEBUG]. - -np, --node-port Listening port for waku node [=60000]. + -np, --node-port Listening port for waku node [=60000]. --websocket-secure-key-path Secure websocket key path: '/path/to/key.txt' . --websocket-secure-cert-path Secure websocket Certificate path: '/path/to/cert.txt' . + -c, --cluster-id Cluster ID of the fleet node to check status [Default=1] + -s, --shard Shards index to subscribe to topics [ Argument may be repeated ] ``` diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index 175c962ed..8aa75affd 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -84,6 +84,21 @@ type WakuCanaryConf* = object desc: "Ping the peer node to measure latency", defaultValue: true, name: "ping" .}: bool + shards* {. + desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + defaultValue: @[], + name: "shard", + abbr: "s" + .}: seq[uint16] + + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 1, + name: "cluster-id", + abbr: "c" + .}: uint16 + proc parseCmdArg*(T: type chronos.Duration, p: string): T = try: result = chronos.seconds(parseInt(p)) @@ -190,6 +205,13 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = var enrBuilder = EnrBuilder.init(nodeKey) + let relayShards = RelayShards.init(conf.clusterId, conf.shards).valueOr: + error "Relay shards initialization failed", error = error + return 1 + enrBuilder.withWakuRelaySharding(relayShards).isOkOr: + error "Building ENR with relay sharding failed", error = error + return 1 + let recordRes = enrBuilder.build() let record = if recordRes.isErr(): @@ -214,6 +236,8 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = ) let node = builder.build().tryGet() + node.mountMetadata(conf.clusterId).isOkOr: + error "failed to mount waku metadata protocol: ", err = error if conf.ping: try: diff --git a/tests/node/test_wakunode_lightpush.nim b/tests/node/test_wakunode_lightpush.nim index 92ef250eb..b5d7a2ea9 100644 --- a/tests/node/test_wakunode_lightpush.nim +++ b/tests/node/test_wakunode_lightpush.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, tables, sequtils], + std/[options, tables, sequtils, tempfiles], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -23,6 +23,7 @@ import waku_lightpush/client, waku_lightpush/protocol_metrics, waku_lightpush/rpc, + waku_rln_relay ], ../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils], ../resources/payloads @@ -59,7 +60,7 @@ suite "Waku Lightpush - End To End": await server.start() await server.mountRelay() - await server.mountLightpush() + await server.mountLightpush() # without rln-relay client.mountLightpushClient() serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() @@ -103,4 +104,83 @@ suite "Waku Lightpush - End To End": check: publishResponse.isErr() - publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" \ No newline at end of file + publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" + +suite "RLN Proofs as a Lightpush Service": + var + handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + handler {.threadvar.}: PushMessageHandler + + server {.threadvar.}: WakuNode + client {.threadvar.}: WakuNode + + serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + pubsubTopic {.threadvar.}: PubsubTopic + contentTopic {.threadvar.}: ContentTopic + message {.threadvar.}: WakuMessage + + asyncSetup: + handlerFuture = newPushHandlerFuture() + handler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + # mount rln-relay + when defined(rln_v2): + let wakuRlnConfig = WakuRlnConfig( + rlnRelayDynamic: false, + rlnRelayCredIndex: some(1.uint), + rlnRelayUserMessageLimit: 1, + rlnEpochSizeSec: 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode"), + ) + else: + let wakuRlnConfig = WakuRlnConfig( + rlnRelayDynamic: false, + rlnRelayCredIndex: some(1.uint), + rlnEpochSizeSec: 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode"), + ) + + await allFutures(server.start(), client.start()) + await server.start() + + await server.mountRelay() + await server.mountRlnRelay(wakuRlnConfig) + await server.mountLightpush() + client.mountLightpushClient() + + serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + message = fakeWakuMessage() + + asyncTeardown: + await server.stop() + + suite "Lightpush attaching RLN proofs": + asyncTest "Message is published when RLN enabled": + # Given a light lightpush client + let lightpushClient = + newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) + lightpushClient.mountLightpushClient() + + # When the client publishes a message + let publishResponse = await lightpushClient.lightpushPublish( + some(pubsubTopic), message, serverRemotePeerInfo + ) + + if not publishResponse.isOk(): + echo "Publish failed: ", publishResponse.error() + + # Then the message is relayed to the server + assertResultOk publishResponse \ No newline at end of file diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index b1e89a4fe..94b1ed213 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -6,7 +6,7 @@ else: {.push raises: [].} import - std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles], + std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles, strformat], stew/[results, byteutils], testutils/unittests, chronos, @@ -108,10 +108,11 @@ proc createEthAccount(): Future[(keys.PrivateKey, Address)] {.async.} = tx.to = some(acc) tx.gasPrice = some(gasPrice) - # Send 10 eth to acc + # Send 1000 eth to acc discard await web3.send(tx) let balance = await web3.provider.eth_getBalance(acc, "latest") - assert(balance == ethToWei(1000.u256)) + assert balance == ethToWei(1000.u256), + fmt"Balance is {balance} but expected {ethToWei(1000.u256)}" return (pk, acc) diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index 4d65051a8..5cece660d 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -48,9 +48,10 @@ suite "Waku v2 Rest API - Admin": await allFutures(node1.start(), node2.start(), node3.start()) await allFutures(node1.mountRelay(), node2.mountRelay(), node3.mountRelay()) - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("127.0.0.1") restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installAdminApiHandlers(restServer.router, node1) diff --git a/tests/wakunode_rest/test_rest_cors.nim b/tests/wakunode_rest/test_rest_cors.nim index 78f78a850..0ad24489d 100644 --- a/tests/wakunode_rest/test_rest_cors.nim +++ b/tests/wakunode_rest/test_rest_cors.nim @@ -27,7 +27,7 @@ proc testWakuNode(): WakuNode = privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() bindIp = parseIpAddress("0.0.0.0") extIp = parseIpAddress("127.0.0.1") - port = Port(58000) + port = Port(0) newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) @@ -86,8 +86,7 @@ proc checkResponse( expectedOrigin.isSome() and response.headers.contains("Access-Control-Allow-Origin") and response.headers.getLastString("Access-Control-Allow-Origin") == - expectedOrigin.get() and - response.headers.contains("Access-Control-Allow-Headers") and + expectedOrigin.get() and response.headers.contains("Access-Control-Allow-Headers") and response.headers.getLastString("Access-Control-Allow-Headers") == "Content-Type" ) ): @@ -106,7 +105,7 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef .init( @@ -116,6 +115,7 @@ suite "Waku v2 REST API CORS Handling": some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"), ) .tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -158,7 +158,7 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef .init( @@ -168,6 +168,7 @@ suite "Waku v2 REST API CORS Handling": some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"), ) .tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -213,10 +214,11 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort, allowedOrigin = some("*")).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -259,7 +261,7 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef .init( @@ -269,6 +271,7 @@ suite "Waku v2 REST API CORS Handling": some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"), ) .tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() diff --git a/tests/wakunode_rest/test_rest_debug.nim b/tests/wakunode_rest/test_rest_debug.nim index 29b954edf..a0fdcf2ba 100644 --- a/tests/wakunode_rest/test_rest_debug.nim +++ b/tests/wakunode_rest/test_rest_debug.nim @@ -26,7 +26,7 @@ proc testWakuNode(): WakuNode = privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() bindIp = parseIpAddress("0.0.0.0") extIp = parseIpAddress("127.0.0.1") - port = Port(58000) + port = Port(0) newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) @@ -37,9 +37,10 @@ suite "Waku v2 REST API - Debug": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -65,9 +66,10 @@ suite "Waku v2 REST API - Debug": await node.start() await node.mountRelay() - let restPort = Port(58002) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 7828b9d6b..e3893ec08 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -59,13 +59,17 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec ) - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("127.0.0.1") testSetup.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = testSetup.restServer.httpServer.address.port + # update with bound port for client use - let restPort2 = Port(58012) + var restPort2 = Port(0) testSetup.restServerForService = WakuRestServerRef.init(restAddress, restPort2).tryGet() + restPort2 = testSetup.restServerForService.httpServer.address.port + # update with bound port for client use # through this one we will see if messages are pushed according to our content topic sub testSetup.messageCache = MessageCache.init() diff --git a/tests/wakunode_rest/test_rest_health.nim b/tests/wakunode_rest/test_rest_health.nim index 7a24fcaf0..f2ff967ff 100644 --- a/tests/wakunode_rest/test_rest_health.nim +++ b/tests/wakunode_rest/test_rest_health.nim @@ -45,9 +45,10 @@ suite "Waku v2 REST API - health": healthMonitor.setOverallHealth(HealthStatus.INITIALIZING) - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installHealthApiHandler(restServer.router, healthMonitor) restServer.start() diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index e2d66155e..217540f46 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -74,9 +74,11 @@ proc init( testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec ) - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("127.0.0.1") testSetup.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = testSetup.restServer.httpServer.address.port + # update with bound port for client use installLightPushRequestHandler(testSetup.restServer.router, testSetup.pushNode) diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index cb5c0cd87..4c653b55f 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -92,9 +92,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -170,9 +171,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -239,9 +241,10 @@ procSuite "Waku Rest API - Store v3": let node = testWakuNode() await node.start() - let restPort = Port(58012) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -334,9 +337,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58013) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -406,9 +410,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58014) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -494,9 +499,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58015) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -549,9 +555,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58016) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -617,9 +624,10 @@ procSuite "Waku Rest API - Store v3": let node = testWakuNode() await node.start() - let restPort = Port(58017) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -680,7 +688,6 @@ procSuite "Waku Rest API - Store v3": var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() - restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) @@ -726,9 +733,10 @@ procSuite "Waku Rest API - Store v3": let node = testWakuNode() await node.start() - let restPort = Port(58018) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 679041580..a2993c524 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -380,8 +380,8 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = pm.peerStore.hasPeer(peerId, WakuRelayCodec) and not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) ): - let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & "]" - let otherShardsString = "[ " & metadata.shards.join(", ") & "]" + let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]" + let otherShardsString = "[ " & metadata.shards.join(", ") & " ]" reason = "no shards in common: my_shards = " & myShardsString & " others_shards = " & otherShardsString diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 77cba5567..31f571e8a 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -44,6 +44,7 @@ import ../waku_lightpush/common, ../waku_lightpush/protocol, ../waku_lightpush/self_req_handler, + ../waku_lightpush/callbacks, ../waku_enr, ../waku_peer_exchange, ../waku_rln_relay, @@ -224,19 +225,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isSubscribed(topic): return - proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - notice "waku.relay received", - my_peer_id = node.peerId, - pubsubTopic = topic, - msg_hash = topic.computeMessageHash(msg).to0xHex(), - receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len - - let msgSizeKB = msg.payload.len / 1000 - - waku_node_messages.inc(labelValues = ["relay"]) - waku_histogram_message_size.observe(msgSizeKB) - proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = if node.wakuFilter.isNil(): return @@ -252,7 +240,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = - await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) @@ -389,6 +376,61 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" +proc generateRelayObserver(node: WakuNode): PubSubObserver = + proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) = + for msg in msgs.messages: + let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr: + warn "Error generating message id", + my_peer_id = node.peerId, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_id_short = shortLog(msg_id) + + let wakuMessage = WakuMessage.decode(msg.data).valueOr: + warn "Error decoding to Waku Message", + my_peer_id = node.peerId, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() + + if onRecv: + notice "received relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + receivedTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + let msgSizeKB = wakuMessage.payload.len / 1000 + waku_node_messages.inc(labelValues = ["relay"]) + waku_histogram_message_size.observe(msgSizeKB) + else: + notice "sent relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + to_peer_id = peer.peerId, + topic = msg.topic, + sentTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + logMessageInfo(peer, msgs, onRecv = true) + + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = + discard + + return PubSubObserver(onRecv: onRecv, onSend: onSend) + proc mountRelay*( node: WakuNode, pubsubTopics: seq[string] = @[], @@ -409,6 +451,11 @@ proc mountRelay*( node.wakuRelay = initRes.value + # register relay observers for logging + debug "Registering Relay observers" + let observerLogger = node.generateRelayObserver() + node.wakuRelay.addObserver(observerLogger) + ## Add peer exchange handler if peerExchangeHandler.isSome(): node.wakuRelay.parameters.enablePX = true @@ -930,34 +977,22 @@ proc mountLightPush*( node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit ) {.async.} = info "mounting light push" + + var pushHandler = + if node.wakuRelay.isNil: + debug "mounting lightpush without relay (nil)" + getNilPushHandler() + else: + debug "mounting lightpush with relay" + let rlnPeer = + if isNil(node.wakuRlnRelay): + debug "mounting lightpush without rln-relay" + none(WakuRLNRelay) + else: + debug "mounting lightpush with rln-relay" + some(node.wakuRlnRelay) + getRelayPushHandler(node.wakuRelay, rlnPeer) - var pushHandler: PushMessageHandler - if node.wakuRelay.isNil(): - debug "mounting lightpush without relay (nil)" - pushHandler = proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = - return err("no waku relay found") - else: - pushHandler = proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = - let validationRes = await node.wakuRelay.validateMessage(pubSubTopic, message) - if validationRes.isErr(): - return err(validationRes.error) - - let publishedCount = - await node.wakuRelay.publish(pubsubTopic, message.encode().buffer) - - if publishedCount == 0: - ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 - let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - notice "Lightpush request has not been published to any peers", - msg_hash = msgHash - - return ok() - - debug "mounting lightpush with relay" node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit)) diff --git a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim index 347cddaa3..3ecf88fa1 100644 --- a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim +++ b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim @@ -70,6 +70,9 @@ proc addPartitionInfo*( trace "Adding partition info" self.partitions.addLast(partitionInfo) +proc clearPartitionInfo*(self: PartitionManager) = + self.partitions.clear() + proc removeOldestPartitionName*(self: PartitionManager) = ## Simply removed the partition from the tracked/known partitions queue. ## Just remove it and ignore it. diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 00bee087b..516d8d70e 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -866,6 +866,11 @@ proc acquireDatabaseLock*( s: PostgresDriver, lockId: int = 841886 ): Future[ArchiveDriverResult[void]] {.async.} = ## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time) + ## This should only be used in the migrations module because this approach doesn't ensure + ## that the lock is acquired/released by the same connection. The preferable "lock" + ## approach is using the "performWriteQueryWithLock" proc. However, we can't use + ## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL + ## scripts. let locked = ( await s.getStr( fmt""" @@ -908,6 +913,46 @@ proc performWriteQuery*( return ok() +const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock" + +proc performWriteQueryWithLock*( + self: PostgresDriver, queryToProtect: string +): Future[ArchiveDriverResult[void]] {.async.} = + ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it + debug "performWriteQueryWithLock", queryToProtect + let query = + fmt""" + DO $$ + DECLARE + lock_acquired boolean; + BEGIN + -- Try to acquire the advisory lock + lock_acquired := pg_try_advisory_lock(123456789); + + IF NOT lock_acquired THEN + RAISE EXCEPTION '{COULD_NOT_ACQUIRE_ADVISORY_LOCK}'; + END IF; + + -- Perform the query + BEGIN + {queryToProtect} + EXCEPTION WHEN OTHERS THEN + -- Ensure the lock is released if an error occurs + PERFORM pg_advisory_unlock(123456789); + RAISE; + END; + + -- Release the advisory lock after the query completes successfully + PERFORM pg_advisory_unlock(123456789); + END $$; +""" + (await self.performWriteQuery(query)).isOkOr: + debug "protected query ended with error", error = $error + return err("protected query ended with error:" & $error) + + debug "protected query ended correctly" + return ok() + proc addPartition( self: PostgresDriver, startTime: Timestamp ): Future[ArchiveDriverResult[void]] {.async.} = @@ -930,21 +975,15 @@ proc addPartition( "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " & "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" - # Lock the db - (await self.acquireDatabaseLock()).isOkOr: - error "failed to acquire lock", error = error - return err("failed to lock the db") - - defer: - (await self.releaseDatabaseLock()).isOkOr: - error "failed to release lock", error = error - return err("failed to unlock the db.") - - (await self.performWriteQuery(createPartitionQuery)).isOkOr: + (await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr: if error.contains("already exists"): debug "skip create new partition as it already exists: ", skipped_error = $error return ok() + if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK): + debug "skip create new partition because the advisory lock is acquired by other" + return ok() + ## for any different error, just consider it return err(fmt"error adding partition [{partitionName}]: " & $error) @@ -953,9 +992,12 @@ proc addPartition( self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) return ok() -proc initializePartitionsInfo( +proc refreshPartitionsInfo( self: PostgresDriver ): Future[ArchiveDriverResult[void]] {.async.} = + debug "refreshPartitionsInfo" + self.partitionMngr.clearPartitionInfo() + let partitionNamesRes = await self.getPartitionsList() if not partitionNamesRes.isOk(): return err("Could not retrieve partitions list: " & $partitionNamesRes.error) @@ -994,13 +1036,13 @@ proc loopPartitionFactory( debug "starting loopPartitionFactory" - ## First of all, let's make the 'partition_manager' aware of the current partitions - (await self.initializePartitionsInfo()).isOkOr: - onFatalError("issue in loopPartitionFactory: " & $error) - while true: trace "Check if we need to create a new partition" + ## Let's make the 'partition_manager' aware of the current partitions + (await self.refreshPartitionsInfo()).isOkOr: + onFatalError("issue in loopPartitionFactory: " & $error) + let now = times.now().toTime().toUnix() if self.partitionMngr.isEmpty(): diff --git a/waku/waku_lightpush/callbacks.nim b/waku/waku_lightpush/callbacks.nim new file mode 100644 index 000000000..6d4bff3d2 --- /dev/null +++ b/waku/waku_lightpush/callbacks.nim @@ -0,0 +1,63 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + ../waku_core, + ../waku_relay, + ./common, + ./protocol, + ../waku_rln_relay, + ../waku_rln_relay/protocol_types, + ../common/ratelimit +import + std/times, + libp2p/peerid, + stew/byteutils + +proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessage): Result[WakuMessage, string] = + # check if the message already has RLN proof + if message.proof.len > 0: + return ok(message) + + if rlnPeer.isNone(): + notice "Publishing message without RLN proof" + return ok(message) + # generate and append RLN proof + let + time = getTime().toUnix() + senderEpochTime = float64(time) + var msgWithProof = message + rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime).isOkOr: + return err(error) + return ok(msgWithProof) + +proc getNilPushHandler*(): PushMessageHandler = + return proc( + peer: PeerId, pubsubTopic: string, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + return err("no waku relay found") + +proc getRelayPushHandler*( + wakuRelay: WakuRelay, + rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() +): PushMessageHandler = + return proc( + peer: PeerId, pubsubTopic: string, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + # append RLN proof + let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message) + if msgWithProof.isErr(): + return err(msgWithProof.error) + + (await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr: + return err(error) + + let publishedCount = await wakuRelay.publish(pubsubTopic, msgWithProof.value) + if publishedCount == 0: + ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + notice "Lightpush request has not been published to any peers", msg_hash = msgHash + + return ok() \ No newline at end of file diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 8cbfcf36b..1e270ccc7 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -115,4 +115,4 @@ proc new*( requestRateLimiter: newTokenBucket(rateLimitSetting), ) wl.initProtocolHandler() - return wl + return wl \ No newline at end of file diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 03d5b596e..ca59a6899 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -180,6 +180,9 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) +proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = + procCall GossipSub(w).addObserver(observer) + method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start()