From 641aa48696dc81ce90c718fd484efa68fe5968a1 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 12 Jun 2024 15:07:33 +0200 Subject: [PATCH 1/6] Use random ports in rest tests instead of fixed ones (#2804) --- tests/wakunode_rest/test_rest_admin.nim | 3 ++- tests/wakunode_rest/test_rest_cors.nim | 17 +++++++------ tests/wakunode_rest/test_rest_debug.nim | 8 +++--- tests/wakunode_rest/test_rest_filter.nim | 8 ++++-- tests/wakunode_rest/test_rest_health.nim | 3 ++- tests/wakunode_rest/test_rest_lightpush.nim | 4 ++- tests/wakunode_rest/test_rest_store.nim | 28 +++++++++++++-------- 7 files changed, 46 insertions(+), 25 deletions(-) diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index 4d65051a8..5cece660d 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -48,9 +48,10 @@ suite "Waku v2 Rest API - Admin": await allFutures(node1.start(), node2.start(), node3.start()) await allFutures(node1.mountRelay(), node2.mountRelay(), node3.mountRelay()) - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("127.0.0.1") restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installAdminApiHandlers(restServer.router, node1) diff --git a/tests/wakunode_rest/test_rest_cors.nim b/tests/wakunode_rest/test_rest_cors.nim index 78f78a850..0ad24489d 100644 --- a/tests/wakunode_rest/test_rest_cors.nim +++ b/tests/wakunode_rest/test_rest_cors.nim @@ -27,7 +27,7 @@ proc testWakuNode(): WakuNode = privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() bindIp = parseIpAddress("0.0.0.0") extIp = parseIpAddress("127.0.0.1") - port = Port(58000) + port = Port(0) newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) @@ -86,8 +86,7 @@ proc checkResponse( expectedOrigin.isSome() and response.headers.contains("Access-Control-Allow-Origin") and response.headers.getLastString("Access-Control-Allow-Origin") == - expectedOrigin.get() and - response.headers.contains("Access-Control-Allow-Headers") and + expectedOrigin.get() and response.headers.contains("Access-Control-Allow-Headers") and response.headers.getLastString("Access-Control-Allow-Headers") == "Content-Type" ) ): @@ -106,7 +105,7 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef .init( @@ -116,6 +115,7 @@ suite "Waku v2 REST API CORS Handling": some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"), ) .tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -158,7 +158,7 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef .init( @@ -168,6 +168,7 @@ suite "Waku v2 REST API CORS Handling": some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"), ) .tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -213,10 +214,11 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort, allowedOrigin = some("*")).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -259,7 +261,7 @@ suite "Waku v2 REST API CORS Handling": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef .init( @@ -269,6 +271,7 @@ suite "Waku v2 REST API CORS Handling": some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"), ) .tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() diff --git a/tests/wakunode_rest/test_rest_debug.nim b/tests/wakunode_rest/test_rest_debug.nim index 29b954edf..a0fdcf2ba 100644 --- a/tests/wakunode_rest/test_rest_debug.nim +++ b/tests/wakunode_rest/test_rest_debug.nim @@ -26,7 +26,7 @@ proc testWakuNode(): WakuNode = privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() bindIp = parseIpAddress("0.0.0.0") extIp = parseIpAddress("127.0.0.1") - port = Port(58000) + port = Port(0) newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) @@ -37,9 +37,10 @@ suite "Waku v2 REST API - Debug": await node.start() await node.mountRelay() - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() @@ -65,9 +66,10 @@ suite "Waku v2 REST API - Debug": await node.start() await node.mountRelay() - let restPort = Port(58002) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installDebugApiHandlers(restServer.router, node) restServer.start() diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 7828b9d6b..e3893ec08 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -59,13 +59,17 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec ) - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("127.0.0.1") testSetup.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = testSetup.restServer.httpServer.address.port + # update with bound port for client use - let restPort2 = Port(58012) + var restPort2 = Port(0) testSetup.restServerForService = WakuRestServerRef.init(restAddress, restPort2).tryGet() + restPort2 = testSetup.restServerForService.httpServer.address.port + # update with bound port for client use # through this one we will see if messages are pushed according to our content topic sub testSetup.messageCache = MessageCache.init() diff --git a/tests/wakunode_rest/test_rest_health.nim b/tests/wakunode_rest/test_rest_health.nim index 7a24fcaf0..f2ff967ff 100644 --- a/tests/wakunode_rest/test_rest_health.nim +++ b/tests/wakunode_rest/test_rest_health.nim @@ -45,9 +45,10 @@ suite "Waku v2 REST API - health": healthMonitor.setOverallHealth(HealthStatus.INITIALIZING) - let restPort = Port(58001) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installHealthApiHandler(restServer.router, healthMonitor) restServer.start() diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index e2d66155e..217540f46 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -74,9 +74,11 @@ proc init( testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec ) - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("127.0.0.1") testSetup.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = testSetup.restServer.httpServer.address.port + # update with bound port for client use installLightPushRequestHandler(testSetup.restServer.router, testSetup.pushNode) diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index cb5c0cd87..4c653b55f 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -92,9 +92,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -170,9 +171,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58011) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -239,9 +241,10 @@ procSuite "Waku Rest API - Store v3": let node = testWakuNode() await node.start() - let restPort = Port(58012) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -334,9 +337,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58013) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -406,9 +410,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58014) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -494,9 +499,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58015) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -549,9 +555,10 @@ procSuite "Waku Rest API - Store v3": await node.start() await node.mountRelay() - let restPort = Port(58016) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -617,9 +624,10 @@ procSuite "Waku Rest API - Store v3": let node = testWakuNode() await node.start() - let restPort = Port(58017) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() @@ -680,7 +688,6 @@ procSuite "Waku Rest API - Store v3": var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() - restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) @@ -726,9 +733,10 @@ procSuite "Waku Rest API - Store v3": let node = testWakuNode() await node.start() - let restPort = Port(58018) + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restPort = restServer.httpServer.address.port # update with bound port for client use installStoreApiHandlers(restServer.router, node) restServer.start() From 767e89d5f1643df94fc9d165e7e16026b5bc600f Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Wed, 12 Jun 2024 22:27:10 +0200 Subject: [PATCH 2/6] added message to failed assert (#2805) --- tests/waku_rln_relay/test_rln_group_manager_onchain.nim | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index a3e412f41..571a30ed0 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -6,7 +6,7 @@ else: {.push raises: [].} import - std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles], + std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles, strformat], stew/[results, byteutils], testutils/unittests, chronos, @@ -119,10 +119,11 @@ proc createEthAccount(): Future[(keys.PrivateKey, Address)] {.async.} = tx.to = some(acc) tx.gasPrice = some(gasPrice) - # Send 10 eth to acc + # Send 1000 eth to acc discard await web3.send(tx) let balance = await web3.provider.eth_getBalance(acc, "latest") - assert(balance == ethToWei(1000.u256)) + assert balance == ethToWei(1000.u256), + fmt"Balance is {balance} but expected {ethToWei(1000.u256)}" return (pk, acc) From 78c9172aaec288f0d0c68d920812b72f9e8d5332 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Thu, 13 Jun 2024 18:35:56 +0200 Subject: [PATCH 3/6] chore: adding observers for message logging (#2800) --- waku/node/waku_node.nim | 74 +++++++++++++++++++++++++++++------- waku/waku_relay/protocol.nim | 3 ++ 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 77cba5567..3e28a7607 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -224,19 +224,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isSubscribed(topic): return - proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - notice "waku.relay received", - my_peer_id = node.peerId, - pubsubTopic = topic, - msg_hash = topic.computeMessageHash(msg).to0xHex(), - receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len - - let msgSizeKB = msg.payload.len / 1000 - - waku_node_messages.inc(labelValues = ["relay"]) - waku_histogram_message_size.observe(msgSizeKB) - proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = if node.wakuFilter.isNil(): return @@ -252,7 +239,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = - await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) @@ -389,6 +375,61 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" +proc generateRelayObserver(node: WakuNode): PubSubObserver = + proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) = + for msg in msgs.messages: + let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr: + warn "Error generating message id", + my_peer_id = node.peerId, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_id_short = shortLog(msg_id) + + let wakuMessage = WakuMessage.decode(msg.data).valueOr: + warn "Error decoding to Waku Message", + my_peer_id = node.peerId, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() + + if onRecv: + notice "received relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + receivedTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + let msgSizeKB = wakuMessage.payload.len / 1000 + waku_node_messages.inc(labelValues = ["relay"]) + waku_histogram_message_size.observe(msgSizeKB) + else: + notice "sent relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + to_peer_id = peer.peerId, + topic = msg.topic, + sentTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + logMessageInfo(peer, msgs, onRecv = true) + + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = + discard + + return PubSubObserver(onRecv: onRecv, onSend: onSend) + proc mountRelay*( node: WakuNode, pubsubTopics: seq[string] = @[], @@ -409,6 +450,11 @@ proc mountRelay*( node.wakuRelay = initRes.value + # register relay observers for logging + debug "Registering Relay observers" + let observerLogger = node.generateRelayObserver() + node.wakuRelay.addObserver(observerLogger) + ## Add peer exchange handler if peerExchangeHandler.isSome(): node.wakuRelay.parameters.enablePX = true diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 03d5b596e..ca59a6899 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -180,6 +180,9 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) +proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = + procCall GossipSub(w).addObserver(observer) + method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start() From 0b97106cbeb19103e85d1a0de38c37bda4b98d6c Mon Sep 17 00:00:00 2001 From: Akhil <111925100+shash256@users.noreply.github.com> Date: Thu, 13 Jun 2024 21:10:00 +0400 Subject: [PATCH 4/6] feat: RLN proofs as a lightpush service (#2768) --- tests/node/test_wakunode_lightpush.nim | 86 +++++++++++++++++++++++++- waku/node/waku_node.nim | 43 +++++-------- waku/waku_lightpush/callbacks.nim | 63 +++++++++++++++++++ waku/waku_lightpush/protocol.nim | 2 +- 4 files changed, 163 insertions(+), 31 deletions(-) create mode 100644 waku/waku_lightpush/callbacks.nim diff --git a/tests/node/test_wakunode_lightpush.nim b/tests/node/test_wakunode_lightpush.nim index 92ef250eb..b5d7a2ea9 100644 --- a/tests/node/test_wakunode_lightpush.nim +++ b/tests/node/test_wakunode_lightpush.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, tables, sequtils], + std/[options, tables, sequtils, tempfiles], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -23,6 +23,7 @@ import waku_lightpush/client, waku_lightpush/protocol_metrics, waku_lightpush/rpc, + waku_rln_relay ], ../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils], ../resources/payloads @@ -59,7 +60,7 @@ suite "Waku Lightpush - End To End": await server.start() await server.mountRelay() - await server.mountLightpush() + await server.mountLightpush() # without rln-relay client.mountLightpushClient() serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() @@ -103,4 +104,83 @@ suite "Waku Lightpush - End To End": check: publishResponse.isErr() - publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" \ No newline at end of file + publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" + +suite "RLN Proofs as a Lightpush Service": + var + handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + handler {.threadvar.}: PushMessageHandler + + server {.threadvar.}: WakuNode + client {.threadvar.}: WakuNode + + serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + pubsubTopic {.threadvar.}: PubsubTopic + contentTopic {.threadvar.}: ContentTopic + message {.threadvar.}: WakuMessage + + asyncSetup: + handlerFuture = newPushHandlerFuture() + handler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + # mount rln-relay + when defined(rln_v2): + let wakuRlnConfig = WakuRlnConfig( + rlnRelayDynamic: false, + rlnRelayCredIndex: some(1.uint), + rlnRelayUserMessageLimit: 1, + rlnEpochSizeSec: 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode"), + ) + else: + let wakuRlnConfig = WakuRlnConfig( + rlnRelayDynamic: false, + rlnRelayCredIndex: some(1.uint), + rlnEpochSizeSec: 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode"), + ) + + await allFutures(server.start(), client.start()) + await server.start() + + await server.mountRelay() + await server.mountRlnRelay(wakuRlnConfig) + await server.mountLightpush() + client.mountLightpushClient() + + serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + message = fakeWakuMessage() + + asyncTeardown: + await server.stop() + + suite "Lightpush attaching RLN proofs": + asyncTest "Message is published when RLN enabled": + # Given a light lightpush client + let lightpushClient = + newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) + lightpushClient.mountLightpushClient() + + # When the client publishes a message + let publishResponse = await lightpushClient.lightpushPublish( + some(pubsubTopic), message, serverRemotePeerInfo + ) + + if not publishResponse.isOk(): + echo "Publish failed: ", publishResponse.error() + + # Then the message is relayed to the server + assertResultOk publishResponse \ No newline at end of file diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3e28a7607..31f571e8a 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -44,6 +44,7 @@ import ../waku_lightpush/common, ../waku_lightpush/protocol, ../waku_lightpush/self_req_handler, + ../waku_lightpush/callbacks, ../waku_enr, ../waku_peer_exchange, ../waku_rln_relay, @@ -976,34 +977,22 @@ proc mountLightPush*( node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit ) {.async.} = info "mounting light push" + + var pushHandler = + if node.wakuRelay.isNil: + debug "mounting lightpush without relay (nil)" + getNilPushHandler() + else: + debug "mounting lightpush with relay" + let rlnPeer = + if isNil(node.wakuRlnRelay): + debug "mounting lightpush without rln-relay" + none(WakuRLNRelay) + else: + debug "mounting lightpush with rln-relay" + some(node.wakuRlnRelay) + getRelayPushHandler(node.wakuRelay, rlnPeer) - var pushHandler: PushMessageHandler - if node.wakuRelay.isNil(): - debug "mounting lightpush without relay (nil)" - pushHandler = proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = - return err("no waku relay found") - else: - pushHandler = proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = - let validationRes = await node.wakuRelay.validateMessage(pubSubTopic, message) - if validationRes.isErr(): - return err(validationRes.error) - - let publishedCount = - await node.wakuRelay.publish(pubsubTopic, message.encode().buffer) - - if publishedCount == 0: - ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 - let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - notice "Lightpush request has not been published to any peers", - msg_hash = msgHash - - return ok() - - debug "mounting lightpush with relay" node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit)) diff --git a/waku/waku_lightpush/callbacks.nim b/waku/waku_lightpush/callbacks.nim new file mode 100644 index 000000000..6d4bff3d2 --- /dev/null +++ b/waku/waku_lightpush/callbacks.nim @@ -0,0 +1,63 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + ../waku_core, + ../waku_relay, + ./common, + ./protocol, + ../waku_rln_relay, + ../waku_rln_relay/protocol_types, + ../common/ratelimit +import + std/times, + libp2p/peerid, + stew/byteutils + +proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessage): Result[WakuMessage, string] = + # check if the message already has RLN proof + if message.proof.len > 0: + return ok(message) + + if rlnPeer.isNone(): + notice "Publishing message without RLN proof" + return ok(message) + # generate and append RLN proof + let + time = getTime().toUnix() + senderEpochTime = float64(time) + var msgWithProof = message + rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime).isOkOr: + return err(error) + return ok(msgWithProof) + +proc getNilPushHandler*(): PushMessageHandler = + return proc( + peer: PeerId, pubsubTopic: string, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + return err("no waku relay found") + +proc getRelayPushHandler*( + wakuRelay: WakuRelay, + rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() +): PushMessageHandler = + return proc( + peer: PeerId, pubsubTopic: string, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + # append RLN proof + let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message) + if msgWithProof.isErr(): + return err(msgWithProof.error) + + (await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr: + return err(error) + + let publishedCount = await wakuRelay.publish(pubsubTopic, msgWithProof.value) + if publishedCount == 0: + ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + notice "Lightpush request has not been published to any peers", msg_hash = msgHash + + return ok() \ No newline at end of file diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 8cbfcf36b..1e270ccc7 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -115,4 +115,4 @@ proc new*( requestRateLimiter: newTokenBucket(rateLimitSetting), ) wl.initProtocolHandler() - return wl + return wl \ No newline at end of file From 0bd3087de90f0ab11ee1606be87834cdd1ba4e9b Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 14 Jun 2024 10:07:41 +0200 Subject: [PATCH 5/6] postgres_driver: better sync lock in partition creation (#2809) With the original approach it happened cases where one connection acquired the lock and other connection tried to release the same lock, causing an unrecoverable failure which made the node to end. --- .../postgres_driver/partitions_manager.nim | 3 + .../postgres_driver/postgres_driver.nim | 74 +++++++++++++++---- 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim index 347cddaa3..3ecf88fa1 100644 --- a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim +++ b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim @@ -70,6 +70,9 @@ proc addPartitionInfo*( trace "Adding partition info" self.partitions.addLast(partitionInfo) +proc clearPartitionInfo*(self: PartitionManager) = + self.partitions.clear() + proc removeOldestPartitionName*(self: PartitionManager) = ## Simply removed the partition from the tracked/known partitions queue. ## Just remove it and ignore it. diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 00bee087b..516d8d70e 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -866,6 +866,11 @@ proc acquireDatabaseLock*( s: PostgresDriver, lockId: int = 841886 ): Future[ArchiveDriverResult[void]] {.async.} = ## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time) + ## This should only be used in the migrations module because this approach doesn't ensure + ## that the lock is acquired/released by the same connection. The preferable "lock" + ## approach is using the "performWriteQueryWithLock" proc. However, we can't use + ## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL + ## scripts. let locked = ( await s.getStr( fmt""" @@ -908,6 +913,46 @@ proc performWriteQuery*( return ok() +const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock" + +proc performWriteQueryWithLock*( + self: PostgresDriver, queryToProtect: string +): Future[ArchiveDriverResult[void]] {.async.} = + ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it + debug "performWriteQueryWithLock", queryToProtect + let query = + fmt""" + DO $$ + DECLARE + lock_acquired boolean; + BEGIN + -- Try to acquire the advisory lock + lock_acquired := pg_try_advisory_lock(123456789); + + IF NOT lock_acquired THEN + RAISE EXCEPTION '{COULD_NOT_ACQUIRE_ADVISORY_LOCK}'; + END IF; + + -- Perform the query + BEGIN + {queryToProtect} + EXCEPTION WHEN OTHERS THEN + -- Ensure the lock is released if an error occurs + PERFORM pg_advisory_unlock(123456789); + RAISE; + END; + + -- Release the advisory lock after the query completes successfully + PERFORM pg_advisory_unlock(123456789); + END $$; +""" + (await self.performWriteQuery(query)).isOkOr: + debug "protected query ended with error", error = $error + return err("protected query ended with error:" & $error) + + debug "protected query ended correctly" + return ok() + proc addPartition( self: PostgresDriver, startTime: Timestamp ): Future[ArchiveDriverResult[void]] {.async.} = @@ -930,21 +975,15 @@ proc addPartition( "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " & "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" - # Lock the db - (await self.acquireDatabaseLock()).isOkOr: - error "failed to acquire lock", error = error - return err("failed to lock the db") - - defer: - (await self.releaseDatabaseLock()).isOkOr: - error "failed to release lock", error = error - return err("failed to unlock the db.") - - (await self.performWriteQuery(createPartitionQuery)).isOkOr: + (await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr: if error.contains("already exists"): debug "skip create new partition as it already exists: ", skipped_error = $error return ok() + if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK): + debug "skip create new partition because the advisory lock is acquired by other" + return ok() + ## for any different error, just consider it return err(fmt"error adding partition [{partitionName}]: " & $error) @@ -953,9 +992,12 @@ proc addPartition( self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) return ok() -proc initializePartitionsInfo( +proc refreshPartitionsInfo( self: PostgresDriver ): Future[ArchiveDriverResult[void]] {.async.} = + debug "refreshPartitionsInfo" + self.partitionMngr.clearPartitionInfo() + let partitionNamesRes = await self.getPartitionsList() if not partitionNamesRes.isOk(): return err("Could not retrieve partitions list: " & $partitionNamesRes.error) @@ -994,13 +1036,13 @@ proc loopPartitionFactory( debug "starting loopPartitionFactory" - ## First of all, let's make the 'partition_manager' aware of the current partitions - (await self.initializePartitionsInfo()).isOkOr: - onFatalError("issue in loopPartitionFactory: " & $error) - while true: trace "Check if we need to create a new partition" + ## Let's make the 'partition_manager' aware of the current partitions + (await self.refreshPartitionsInfo()).isOkOr: + onFatalError("issue in loopPartitionFactory: " & $error) + let now = times.now().toTime().toUnix() if self.partitionMngr.isEmpty(): From 88d25755db2106b668677dc094525a47023f9d39 Mon Sep 17 00:00:00 2001 From: Darshan K <35736874+darshankabariya@users.noreply.github.com> Date: Fri, 14 Jun 2024 18:29:42 +0530 Subject: [PATCH 6/6] fix: mount metadata in wakucanary (#2793) * chore: integrate cluster id and shards to waku node. --- apps/wakucanary/README.md | 4 +++- apps/wakucanary/wakucanary.nim | 24 ++++++++++++++++++++++++ waku/node/peer_manager/peer_manager.nim | 4 ++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/apps/wakucanary/README.md b/apps/wakucanary/README.md index 2f8e5275e..6ae4ca3e9 100644 --- a/apps/wakucanary/README.md +++ b/apps/wakucanary/README.md @@ -15,9 +15,11 @@ The following options are available: -p, --protocol Protocol required to be supported: store,relay,lightpush,filter (can be used multiple times). -l, --log-level Sets the log level [=LogLevel.DEBUG]. - -np, --node-port Listening port for waku node [=60000]. + -np, --node-port Listening port for waku node [=60000]. --websocket-secure-key-path Secure websocket key path: '/path/to/key.txt' . --websocket-secure-cert-path Secure websocket Certificate path: '/path/to/cert.txt' . + -c, --cluster-id Cluster ID of the fleet node to check status [Default=1] + -s, --shard Shards index to subscribe to topics [ Argument may be repeated ] ``` diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index 175c962ed..8aa75affd 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -84,6 +84,21 @@ type WakuCanaryConf* = object desc: "Ping the peer node to measure latency", defaultValue: true, name: "ping" .}: bool + shards* {. + desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.", + defaultValue: @[], + name: "shard", + abbr: "s" + .}: seq[uint16] + + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 1, + name: "cluster-id", + abbr: "c" + .}: uint16 + proc parseCmdArg*(T: type chronos.Duration, p: string): T = try: result = chronos.seconds(parseInt(p)) @@ -190,6 +205,13 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = var enrBuilder = EnrBuilder.init(nodeKey) + let relayShards = RelayShards.init(conf.clusterId, conf.shards).valueOr: + error "Relay shards initialization failed", error = error + return 1 + enrBuilder.withWakuRelaySharding(relayShards).isOkOr: + error "Building ENR with relay sharding failed", error = error + return 1 + let recordRes = enrBuilder.build() let record = if recordRes.isErr(): @@ -214,6 +236,8 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = ) let node = builder.build().tryGet() + node.mountMetadata(conf.clusterId).isOkOr: + error "failed to mount waku metadata protocol: ", err = error if conf.ping: try: diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 679041580..a2993c524 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -380,8 +380,8 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = pm.peerStore.hasPeer(peerId, WakuRelayCodec) and not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) ): - let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & "]" - let otherShardsString = "[ " & metadata.shards.join(", ") & "]" + let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]" + let otherShardsString = "[ " & metadata.shards.join(", ") & " ]" reason = "no shards in common: my_shards = " & myShardsString & " others_shards = " & otherShardsString