diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index da57991c1..3723291e3 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -33,8 +33,8 @@ import import waku/[ waku_core, - waku_lightpush/common, - waku_lightpush/rpc, + waku_lightpush_legacy/common, + waku_lightpush_legacy/rpc, waku_enr, discovery/waku_dnsdisc, waku_store_legacy, @@ -227,9 +227,9 @@ proc publish(c: Chat, line: string) = c.node.wakuRlnRelay.lastEpoch = proof.epoch try: - if not c.node.wakuLightPush.isNil(): + if not c.node.wakuLegacyLightPush.isNil(): # Attempt lightpush - (waitFor c.node.lightpushPublish(some(DefaultPubsubTopic), message)).isOkOr: + (waitFor c.node.legacyLightpushPublish(some(DefaultPubsubTopic), message)).isOkOr: error "failed to publish lightpush message", error = error else: (waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr: @@ -502,8 +502,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if conf.lightpushnode != "": let peerInfo = parsePeerInfo(conf.lightpushnode) if peerInfo.isOk(): - await mountLightPush(node) - node.mountLightPushClient() + await mountLegacyLightPush(node) + node.mountLegacyLightPushClient() node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec) else: error "LightPush not mounted. Couldn't parse conf.lightpushnode", diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim index 2d48348b2..32f802fe4 100644 --- a/apps/liteprotocoltester/lightpush_publisher.nim +++ b/apps/liteprotocoltester/lightpush_publisher.nim @@ -145,7 +145,7 @@ proc publishMessages( lightpushContentTopic, renderMsgSize, ) - let wlpRes = await wakuNode.lightpushPublish( + let wlpRes = await wakuNode.legacyLightpushPublish( some(lightpushPubsubTopic), message, actualServicePeer ) @@ -209,7 +209,7 @@ proc setupAndPublish*( if isNil(wakuNode.wakuLightpushClient): # if we have not yet initialized lightpush client, then do it as the only way we can get here is # by having a service peer discovered. - wakuNode.mountLightPushClient() + wakuNode.mountLegacyLightPushClient() # give some time to receiver side to set up let waitTillStartTesting = conf.startPublishingAfter.seconds diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index 5f6ec4ee0..c23b80e72 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -202,7 +202,7 @@ when isMainModule: var codec = WakuLightPushCodec # mounting relevant client, for PX filter client must be mounted ahead if conf.testFunc == TesterFunctionality.SENDER: - wakuApp.node.mountLightPushClient() + wakuApp.node.mountLegacyLightPushClient() codec = WakuLightPushCodec else: waitFor wakuApp.node.mountFilterClient() diff --git a/examples/lightpush_publisher.nim b/examples/lightpush_publisher.nim index 0615c1f6b..b0f919a89 100644 --- a/examples/lightpush_publisher.nim +++ b/examples/lightpush_publisher.nim @@ -70,7 +70,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = let node = builder.build().tryGet() node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") - node.mountLightPushClient() + node.mountLegacyLightPushClient() await node.start() node.peerManager.start() @@ -87,8 +87,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = let lightpushPeer = parsePeerInfo(LightpushPeer).get() - let res = - await node.lightpushPublish(some(LightpushPubsubTopic), message, lightpushPeer) + let res = await node.legacyLightpushPublish( + some(LightpushPubsubTopic), message, lightpushPeer + ) if res.isOk: notice "published message", diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim index 70a6b6116..e7006ad06 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim @@ -2,12 +2,13 @@ import options import chronicles, chronos, results import ../../../../../waku/waku_core/message/message, + ../../../../../waku/waku_core/codecs, ../../../../../waku/factory/waku, ../../../../../waku/waku_core/message, ../../../../../waku/waku_core/time, # Timestamp ../../../../../waku/waku_core/topics/pubsub_topic, - ../../../../../waku/waku_lightpush/client, - ../../../../../waku/waku_lightpush/common, + ../../../../../waku/waku_lightpush_legacy/client, + ../../../../../waku/waku_lightpush_legacy/common, ../../../../../waku/node/peer_manager/peer_manager, ../../../../alloc @@ -98,7 +99,7 @@ proc process*( return err(errorMsg) let msgHashHex = ( - await waku.node.wakuLightpushClient.publish( + await waku.node.wakuLegacyLightpushClient.publish( pubsubTopic, msg, peer = peerOpt.get() ) ).valueOr: diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index 4f7c8ac5e..232630591 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -115,14 +115,13 @@ proc process*( let msg = self.message.toWakuMessage() let pubsubTopic = $self.pubsubTopic - let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg) - if numPeers == 0: - let errorMsg = "Message not sent because no peers found." + (await waku.node.wakuRelay.publish(pubsubTopic, msg)).isOkOr: + let errorMsg = "Message not sent." & $error error "PUBLISH failed", error = errorMsg return err(errorMsg) - elif numPeers > 0: - let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex - return ok(msgHash) + + let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex + return ok(msgHash) of LIST_CONNECTED_PEERS: let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr: error "LIST_CONNECTED_PEERS failed", error = error diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 46c235b51..39ac57caf 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -65,6 +65,7 @@ import ./node/test_all, ./waku_filter_v2/test_all, ./waku_peer_exchange/test_all, + ./waku_lightpush_legacy/test_all, ./waku_lightpush/test_all, ./waku_relay/test_all, ./incentivization/test_all @@ -72,7 +73,6 @@ import import # Waku v2 tests ./test_wakunode, - ./test_wakunode_lightpush, ./test_peer_store_extended, ./test_message_cache, ./test_peer_manager, @@ -98,7 +98,7 @@ import ./wakunode_rest/test_rest_relay_serdes, ./wakunode_rest/test_rest_serdes, ./wakunode_rest/test_rest_filter, - ./wakunode_rest/test_rest_lightpush, + ./wakunode_rest/test_rest_lightpush_legacy, ./wakunode_rest/test_rest_admin, ./wakunode_rest/test_rest_cors, ./wakunode_rest/test_rest_health diff --git a/tests/incentivization/test_poc_reputation.nim b/tests/incentivization/test_poc_reputation.nim index b35c4b92f..d601d1e24 100644 --- a/tests/incentivization/test_poc_reputation.nim +++ b/tests/incentivization/test_poc_reputation.nim @@ -11,7 +11,7 @@ import import waku/[node/peer_manager, waku_core], waku/incentivization/[rpc, reputation_manager], - waku/waku_lightpush/rpc + waku/waku_lightpush_legacy/rpc suite "Waku Incentivization PoC Reputation": var manager {.threadvar.}: ReputationManager diff --git a/tests/node/test_all.nim b/tests/node/test_all.nim index 6c3d76175..4840f49a2 100644 --- a/tests/node/test_all.nim +++ b/tests/node/test_all.nim @@ -1,5 +1,6 @@ import ./test_wakunode_filter, + ./test_wakunode_legacy_lightpush, ./test_wakunode_lightpush, ./test_wakunode_peer_exchange, ./test_wakunode_store, diff --git a/tests/node/test_wakunode_legacy_lightpush.nim b/tests/node/test_wakunode_legacy_lightpush.nim new file mode 100644 index 000000000..ab23921a0 --- /dev/null +++ b/tests/node/test_wakunode_legacy_lightpush.nim @@ -0,0 +1,233 @@ +{.used.} + +import + std/[options, tables, sequtils, tempfiles, strutils], + stew/shims/net as stewNet, + testutils/unittests, + chronos, + chronicles, + std/strformat, + os, + libp2p/[peerstore, crypto/crypto] + +import + waku/[ + waku_core, + node/peer_manager, + node/waku_node, + waku_filter_v2, + waku_filter_v2/client, + waku_filter_v2/subscriptions, + waku_lightpush_legacy, + waku_lightpush_legacy/common, + waku_lightpush_legacy/client, + waku_lightpush_legacy/protocol_metrics, + waku_lightpush_legacy/rpc, + waku_rln_relay, + ], + ../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils], + ../resources/payloads + +suite "Waku Legacy Lightpush - End To End": + var + handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + handler {.threadvar.}: PushMessageHandler + + server {.threadvar.}: WakuNode + client {.threadvar.}: WakuNode + + serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + pubsubTopic {.threadvar.}: PubsubTopic + contentTopic {.threadvar.}: ContentTopic + message {.threadvar.}: WakuMessage + + asyncSetup: + handlerFuture = newPushHandlerFuture() + handler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + await allFutures(server.start(), client.start()) + await server.start() + + await server.mountRelay() + await server.mountLegacyLightpush() # without rln-relay + client.mountLegacyLightpushClient() + + serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + message = fakeWakuMessage() + + asyncTeardown: + await server.stop() + + suite "Assessment of Message Relaying Mechanisms": + asyncTest "Via 11/WAKU2-RELAY from Relay/Full Node": + # Given a light lightpush client + let lightpushClient = + newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) + lightpushClient.mountLegacyLightpushClient() + + # When the client publishes a message + let publishResponse = await lightpushClient.legacyLightpushPublish( + some(pubsubTopic), message, serverRemotePeerInfo + ) + + if not publishResponse.isOk(): + echo "Publish failed: ", publishResponse.error() + + # Then the message is not relayed but not due to RLN + assert publishResponse.isErr(), "We expect an error response" + + assert (publishResponse.error == protocol_metrics.notPublishedAnyPeer), + "incorrect error response" + + suite "Waku LightPush Validation Tests": + asyncTest "Validate message size exceeds limit": + let msgOverLimit = fakeWakuMessage( + contentTopic = contentTopic, + payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024), + ) + + # When the client publishes an over-limit message + let publishResponse = await client.legacyLightpushPublish( + some(pubsubTopic), msgOverLimit, serverRemotePeerInfo + ) + + check: + publishResponse.isErr() + publishResponse.error == + fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" + +suite "RLN Proofs as a Lightpush Service": + var + handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + handler {.threadvar.}: PushMessageHandler + + server {.threadvar.}: WakuNode + client {.threadvar.}: WakuNode + + serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + pubsubTopic {.threadvar.}: PubsubTopic + contentTopic {.threadvar.}: ContentTopic + message {.threadvar.}: WakuMessage + + asyncSetup: + handlerFuture = newPushHandlerFuture() + handler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + # mount rln-relay + let wakuRlnConfig = WakuRlnConfig( + rlnRelayDynamic: false, + rlnRelayCredIndex: some(1.uint), + rlnRelayUserMessageLimit: 1, + rlnEpochSizeSec: 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode"), + ) + + await allFutures(server.start(), client.start()) + await server.start() + + await server.mountRelay() + await server.mountRlnRelay(wakuRlnConfig) + await server.mountLegacyLightPush() + client.mountLegacyLightPushClient() + + serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + message = fakeWakuMessage() + + asyncTeardown: + await server.stop() + + suite "Lightpush attaching RLN proofs": + asyncTest "Message is published when RLN enabled": + # Given a light lightpush client + let lightpushClient = + newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) + lightpushClient.mountLegacyLightPushClient() + + # When the client publishes a message + let publishResponse = await lightpushClient.legacyLightpushPublish( + some(pubsubTopic), message, serverRemotePeerInfo + ) + + if not publishResponse.isOk(): + echo "Publish failed: ", publishResponse.error() + + # Then the message is not relayed but not due to RLN + assert publishResponse.isErr(), "We expect an error response" + check publishResponse.error == protocol_metrics.notPublishedAnyPeer + +suite "Waku Legacy Lightpush message delivery": + asyncTest "Legacy lightpush message flow succeed": + ## Setup + let + lightNodeKey = generateSecp256k1Key() + lightNode = newTestWakuNode(lightNodeKey, parseIpAddress("0.0.0.0"), Port(0)) + bridgeNodeKey = generateSecp256k1Key() + bridgeNode = newTestWakuNode(bridgeNodeKey, parseIpAddress("0.0.0.0"), Port(0)) + destNodeKey = generateSecp256k1Key() + destNode = newTestWakuNode(destNodeKey, parseIpAddress("0.0.0.0"), Port(0)) + + await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) + + await destNode.mountRelay(@[DefaultRelayShard]) + await bridgeNode.mountRelay(@[DefaultRelayShard]) + await bridgeNode.mountLegacyLightPush() + lightNode.mountLegacyLightPushClient() + + discard await lightNode.peerManager.dialPeer( + bridgeNode.peerInfo.toRemotePeerInfo(), WakuLegacyLightPushCodec + ) + await sleepAsync(100.milliseconds) + await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()]) + + ## Given + let message = fakeWakuMessage() + + var completionFutRelay = newFuture[bool]() + proc relayHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + check: + topic == DefaultPubsubTopic + msg == message + completionFutRelay.complete(true) + + destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + + # Wait for subscription to take effect + await sleepAsync(100.millis) + + ## When + let res = await lightNode.legacyLightpushPublish(some(DefaultPubsubTopic), message) + assert res.isOk(), $res.error + + ## Then + check await completionFutRelay.withTimeout(5.seconds) + + ## Cleanup + await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop()) diff --git a/tests/node/test_wakunode_lightpush.nim b/tests/node/test_wakunode_lightpush.nim index 30158ebd1..eeef02a32 100644 --- a/tests/node/test_wakunode_lightpush.nim +++ b/tests/node/test_wakunode_lightpush.nim @@ -19,15 +19,13 @@ import waku_filter_v2/client, waku_filter_v2/subscriptions, waku_lightpush, - waku_lightpush/common, - waku_lightpush/client, - waku_lightpush/protocol_metrics, - waku_lightpush/rpc, waku_rln_relay, ], ../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils], ../resources/payloads +const PublishedToOnePeer = 1 + suite "Waku Lightpush - End To End": var handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] @@ -45,9 +43,9 @@ suite "Waku Lightpush - End To End": handlerFuture = newPushHandlerFuture() handler = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = + ): Future[WakuLightPushResult] {.async.} = handlerFuture.complete((pubsubTopic, message)) - return ok() + return ok(PublishedToOnePeer) let serverKey = generateSecp256k1Key() @@ -80,16 +78,16 @@ suite "Waku Lightpush - End To End": # When the client publishes a message let publishResponse = await lightpushClient.lightpushPublish( - some(pubsubTopic), message, serverRemotePeerInfo + some(pubsubTopic), message, some(serverRemotePeerInfo) ) if not publishResponse.isOk(): - echo "Publish failed: ", publishResponse.error() + echo "Publish failed: ", publishResponse.error.code # Then the message is not relayed but not due to RLN assert publishResponse.isErr(), "We expect an error response" - assert (publishResponse.error == protocol_metrics.notPublishedAnyPeer), + assert (publishResponse.error.code == NO_PEERS_TO_RELAY), "incorrect error response" suite "Waku LightPush Validation Tests": @@ -101,13 +99,14 @@ suite "Waku Lightpush - End To End": # When the client publishes an over-limit message let publishResponse = await client.lightpushPublish( - some(pubsubTopic), msgOverLimit, serverRemotePeerInfo + some(pubsubTopic), msgOverLimit, some(serverRemotePeerInfo) ) check: publishResponse.isErr() - publishResponse.error == - fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" + publishResponse.error.code == INVALID_MESSAGE_ERROR + publishResponse.error.desc == + some(fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes") suite "RLN Proofs as a Lightpush Service": var @@ -126,9 +125,9 @@ suite "RLN Proofs as a Lightpush Service": handlerFuture = newPushHandlerFuture() handler = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = + ): Future[WakuLightPushResult] {.async.} = handlerFuture.complete((pubsubTopic, message)) - return ok() + return ok(PublishedToOnePeer) let serverKey = generateSecp256k1Key() @@ -151,8 +150,8 @@ suite "RLN Proofs as a Lightpush Service": await server.mountRelay() await server.mountRlnRelay(wakuRlnConfig) - await server.mountLightpush() - client.mountLightpushClient() + await server.mountLightPush() + client.mountLightPushClient() serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() pubsubTopic = DefaultPubsubTopic @@ -167,11 +166,11 @@ suite "RLN Proofs as a Lightpush Service": # Given a light lightpush client let lightpushClient = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) - lightpushClient.mountLightpushClient() + lightpushClient.mountLightPushClient() # When the client publishes a message let publishResponse = await lightpushClient.lightpushPublish( - some(pubsubTopic), message, serverRemotePeerInfo + some(pubsubTopic), message, some(serverRemotePeerInfo) ) if not publishResponse.isOk(): @@ -179,5 +178,55 @@ suite "RLN Proofs as a Lightpush Service": # Then the message is not relayed but not due to RLN assert publishResponse.isErr(), "We expect an error response" - assert (publishResponse.error == protocol_metrics.notPublishedAnyPeer), - "incorrect error response" + check publishResponse.error.code == NO_PEERS_TO_RELAY + +suite "Waku Lightpush message delivery": + asyncTest "lightpush message flow succeed": + ## Setup + let + lightNodeKey = generateSecp256k1Key() + lightNode = newTestWakuNode(lightNodeKey, parseIpAddress("0.0.0.0"), Port(0)) + bridgeNodeKey = generateSecp256k1Key() + bridgeNode = newTestWakuNode(bridgeNodeKey, parseIpAddress("0.0.0.0"), Port(0)) + destNodeKey = generateSecp256k1Key() + destNode = newTestWakuNode(destNodeKey, parseIpAddress("0.0.0.0"), Port(0)) + + await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) + + await destNode.mountRelay(@[DefaultRelayShard]) + await bridgeNode.mountRelay(@[DefaultRelayShard]) + await bridgeNode.mountLightPush() + lightNode.mountLightPushClient() + + discard await lightNode.peerManager.dialPeer( + bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec + ) + await sleepAsync(100.milliseconds) + await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()]) + + ## Given + let message = fakeWakuMessage() + + var completionFutRelay = newFuture[bool]() + proc relayHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + check: + topic == DefaultPubsubTopic + msg == message + completionFutRelay.complete(true) + + destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + + # Wait for subscription to take effect + await sleepAsync(100.millis) + + ## When + let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message) + assert res.isOk(), $res.error + + ## Then + check await completionFutRelay.withTimeout(5.seconds) + + ## Cleanup + await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop()) diff --git a/tests/node/test_wakunode_sharding.nim b/tests/node/test_wakunode_sharding.nim index 95b4043d9..bdd6859b9 100644 --- a/tests/node/test_wakunode_sharding.nim +++ b/tests/node/test_wakunode_sharding.nim @@ -286,7 +286,7 @@ suite "Sharding": asyncTest "lightpush": # Given a connected server and client subscribed to the same pubsub topic - client.mountLightPushClient() + client.mountLegacyLightPushClient() await server.mountLightpush() let @@ -299,7 +299,7 @@ suite "Sharding": let msg = WakuMessage(payload: "message".toBytes(), contentTopic: "myContentTopic") - lightpublishRespnse = await client.lightpushPublish( + lightpublishRespnse = await client.legacyLightpushPublish( some(topic), msg, server.switch.peerInfo.toRemotePeerInfo() ) @@ -409,7 +409,7 @@ suite "Sharding": asyncTest "lightpush (automatic sharding filtering)": # Given a connected server and client using the same content topic (with two different formats) - client.mountLightPushClient() + client.mountLegacyLightPushClient() await server.mountLightpush() let @@ -424,7 +424,7 @@ suite "Sharding": let msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopicFull) - lightpublishRespnse = await client.lightpushPublish( + lightpublishRespnse = await client.legacyLightpushPublish( some(pubsubTopic), msg, server.switch.peerInfo.toRemotePeerInfo() ) @@ -567,7 +567,7 @@ suite "Sharding": asyncTest "lightpush - exclusion (automatic sharding filtering)": # Given a connected server and client using different content topics - client.mountLightPushClient() + client.mountLegacyLightPushClient() await server.mountLightpush() let @@ -584,7 +584,7 @@ suite "Sharding": # When a peer publishes a message in the server's subscribed topic (the client, for testing easeness) let msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic2) - lightpublishRespnse = await client.lightpushPublish( + lightpublishRespnse = await client.legacyLightpushPublish( some(pubsubTopic2), msg, server.switch.peerInfo.toRemotePeerInfo() ) @@ -854,12 +854,12 @@ suite "Sharding": (await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr() asyncTest "Protocol with Unconfigured PubSub Topic Fails": - # Given a + # Given a let contentTopic = "myContentTopic" topic = "/waku/2/rs/0/1" # Using a different topic to simulate "unconfigured" pubsub topic - # but to have a handler (and be able to assert the test) + # but to have a handler (and be able to assert the test) serverHandler = server.subscribeCompletionHandler("/waku/2/rs/0/0") clientHandler = client.subscribeCompletionHandler("/waku/2/rs/0/0") @@ -878,7 +878,7 @@ suite "Sharding": asyncTest "Waku LightPush Sharding (Static Sharding)": # Given a connected server and client using two different pubsub topics - client.mountLightPushClient() + client.mountLegacyLightPushClient() await server.mountLightpush() # Given a connected server and client subscribed to multiple pubsub topics @@ -898,7 +898,7 @@ suite "Sharding": # When a peer publishes a message (the client, for testing easeness) in topic1 let msg1 = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic) - lightpublishRespnse = await client.lightpushPublish( + lightpublishRespnse = await client.legacyLightpushPublish( some(topic1), msg1, server.switch.peerInfo.toRemotePeerInfo() ) @@ -916,7 +916,7 @@ suite "Sharding": clientHandler2.reset() let msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic) - lightpublishResponse2 = await client.lightpushPublish( + lightpublishResponse2 = await client.legacyLightpushPublish( some(topic2), msg2, server.switch.peerInfo.toRemotePeerInfo() ) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 333433dc2..4fd148b81 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -770,7 +770,7 @@ procSuite "Peer Manager": # service peers node.peerManager.addServicePeer(peers[0], WakuStoreCodec) - node.peerManager.addServicePeer(peers[1], WakuLightPushCodec) + node.peerManager.addServicePeer(peers[1], WakuLegacyLightPushCodec) node.peerManager.addServicePeer(peers[2], WakuPeerExchangeCodec) # relay peers (should not be added) @@ -788,7 +788,7 @@ procSuite "Peer Manager": # all service peers are added to its service slot check: node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId - node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[1].peerId + node.peerManager.serviceSlots[WakuLegacyLightPushCodec].peerId == peers[1].peerId node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[2].peerId # but the relay peer is not @@ -917,13 +917,13 @@ procSuite "Peer Manager": selectedPeer2.get().peerId == peers[0].peerId # And return none if we dont have any peer for that protocol - let selectedPeer3 = pm.selectPeer(WakuLightPushCodec) + let selectedPeer3 = pm.selectPeer(WakuLegacyLightPushCodec) check: selectedPeer3.isSome() == false # Now we add service peers for different protocols peer[1..3] pm.addServicePeer(peers[1], WakuStoreCodec) - pm.addServicePeer(peers[2], WakuLightPushCodec) + pm.addServicePeer(peers[2], WakuLegacyLightPushCodec) # We no longer get one from the peerstore. Slots are being used instead. let selectedPeer4 = pm.selectPeer(WakuStoreCodec) @@ -931,7 +931,7 @@ procSuite "Peer Manager": selectedPeer4.isSome() == true selectedPeer4.get().peerId == peers[1].peerId - let selectedPeer5 = pm.selectPeer(WakuLightPushCodec) + let selectedPeer5 = pm.selectPeer(WakuLegacyLightPushCodec) check: selectedPeer5.isSome() == true selectedPeer5.get().peerId == peers[2].peerId diff --git a/tests/test_waku_enr.nim b/tests/test_waku_enr.nim index 2b91e6147..b6571b09f 100644 --- a/tests/test_waku_enr.nim +++ b/tests/test_waku_enr.nim @@ -1,7 +1,7 @@ {.used.} import std/[options, sequtils], stew/results, testutils/unittests -import waku/waku_core, waku/waku_enr, ./testlib/wakucore, waku/waku_core/codecs +import waku/waku_core, waku/waku_enr, ./testlib/wakucore suite "Waku ENR - Capabilities bitfield": test "check capabilities support": diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim deleted file mode 100644 index c680fb468..000000000 --- a/tests/test_wakunode_lightpush.nim +++ /dev/null @@ -1,58 +0,0 @@ -{.used.} - -import std/options, stew/shims/net as stewNet, testutils/unittests, chronos -import - waku/[waku_core, waku_lightpush/common, node/peer_manager, waku_node], - ./testlib/wakucore, - ./testlib/wakunode - -suite "WakuNode - Lightpush": - asyncTest "Lightpush message return success": - ## Setup - let - lightNodeKey = generateSecp256k1Key() - lightNode = newTestWakuNode(lightNodeKey, parseIpAddress("0.0.0.0"), Port(0)) - bridgeNodeKey = generateSecp256k1Key() - bridgeNode = newTestWakuNode(bridgeNodeKey, parseIpAddress("0.0.0.0"), Port(0)) - destNodeKey = generateSecp256k1Key() - destNode = newTestWakuNode(destNodeKey, parseIpAddress("0.0.0.0"), Port(0)) - - await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - - await destNode.mountRelay(@[DefaultRelayShard]) - await bridgeNode.mountRelay(@[DefaultRelayShard]) - await bridgeNode.mountLightPush() - lightNode.mountLightPushClient() - - discard await lightNode.peerManager.dialPeer( - bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec - ) - await sleepAsync(100.milliseconds) - await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()]) - - ## Given - let message = fakeWakuMessage() - - var completionFutRelay = newFuture[bool]() - proc relayHandler( - topic: PubsubTopic, msg: WakuMessage - ): Future[void] {.async, gcsafe.} = - check: - topic == DefaultPubsubTopic - msg == message - completionFutRelay.complete(true) - - destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) - - # Wait for subscription to take effect - await sleepAsync(100.millis) - - ## When - let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message) - assert res.isOk(), $res.error - - ## Then - check await completionFutRelay.withTimeout(5.seconds) - - ## Cleanup - await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop()) diff --git a/tests/waku_core/test_peers.nim b/tests/waku_core/test_peers.nim index 366b1d25f..3dc68fa1a 100644 --- a/tests/waku_core/test_peers.nim +++ b/tests/waku_core/test_peers.nim @@ -7,7 +7,7 @@ import libp2p/peerid, libp2p/errors, confutils/toml/std/net -import waku/[waku_core, waku_core/codecs, waku_enr], ../testlib/wakucore +import waku/[waku_core, waku_enr], ../testlib/wakucore suite "Waku Core - Peers": test "Peer info parses correctly": diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index 45bbe125c..f3e94cb47 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -5,6 +5,7 @@ import std/options, chronicles, chronos, libp2p/crypto/crypto import waku/node/peer_manager, waku/waku_core, + waku/waku_core/topics/sharding, waku/waku_lightpush, waku/waku_lightpush/[client, common], waku/common/rate_limit/setting, @@ -17,7 +18,8 @@ proc newTestWakuLightpushNode*( ): Future[WakuLightPush] {.async.} = let peerManager = PeerManager.new(switch) - proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting) + wakuSharding = Sharding(clusterId: 1, shardCountGenZero: 8) + proto = WakuLightPush.new(peerManager, rng, handler, wakuSharding, rateLimitSetting) await proto.start() switch.mount(proto) diff --git a/tests/waku_lightpush/test_client.nim b/tests/waku_lightpush/test_client.nim index 8b8e5529e..060a8c22b 100644 --- a/tests/waku_lightpush/test_client.nim +++ b/tests/waku_lightpush/test_client.nim @@ -13,10 +13,7 @@ import waku_core, waku_lightpush, waku_lightpush/client, - waku_lightpush/common, waku_lightpush/protocol_metrics, - waku_lightpush/rpc, - waku_lightpush/rpc_codec, ], ../testlib/[assertions, wakucore, testasync, futures, testutils], ./lightpush_utils, @@ -42,12 +39,14 @@ suite "Waku Lightpush Client": handlerFuture = newPushHandlerFuture() handler = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = + ): Future[WakuLightPushResult] {.async.} = let msgLen = message.encode().buffer.len if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024: - return err("length greater than maxMessageSize") + return + lighpushErrorResult(PAYLOAD_TOO_LARGE, "length greater than maxMessageSize") handlerFuture.complete((pubsubTopic, message)) - return ok() + # return that we published the message to 1 peer. + return ok(1) serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() @@ -80,7 +79,7 @@ suite "Waku Lightpush Client": # When publishing a valid payload let publishResponse = - await client.publish(pubsubTopic, message, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message, serverRemotePeerInfo) # Then the message is received by the server discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) @@ -92,8 +91,9 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() - let publishResponse2 = - await client.publish(pubsub_topics.CURRENT, message2, serverRemotePeerInfo) + let publishResponse2 = await client.publish( + some(pubsub_topics.CURRENT), message2, serverRemotePeerInfo + ) # Then the message is received by the server discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) @@ -106,7 +106,7 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() let publishResponse3 = await client.publish( - pubsub_topics.CURRENT_NESTED, message3, serverRemotePeerInfo + some(pubsub_topics.CURRENT_NESTED), message3, serverRemotePeerInfo ) # Then the message is received by the server @@ -119,8 +119,9 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() - let publishResponse4 = - await client.publish(pubsub_topics.SHARDING, message4, serverRemotePeerInfo) + let publishResponse4 = await client.publish( + some(pubsub_topics.SHARDING), message4, serverRemotePeerInfo + ) # Then the message is received by the server discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) @@ -133,7 +134,7 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() let publishResponse5 = - await client.publish(pubsub_topics.PLAIN, message5, serverRemotePeerInfo) + await client.publish(some(pubsub_topics.PLAIN), message5, serverRemotePeerInfo) # Then the message is received by the server discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) @@ -146,7 +147,7 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() let publishResponse6 = - await client.publish(pubsub_topics.LEGACY, message6, serverRemotePeerInfo) + await client.publish(some(pubsub_topics.LEGACY), message6, serverRemotePeerInfo) # Then the message is received by the server discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) @@ -159,7 +160,7 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() let publishResponse7 = await client.publish( - pubsub_topics.LEGACY_NESTED, message7, serverRemotePeerInfo + some(pubsub_topics.LEGACY_NESTED), message7, serverRemotePeerInfo ) # Then the message is received by the server @@ -173,7 +174,7 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() let publishResponse8 = await client.publish( - pubsub_topics.LEGACY_ENCODING, message8, serverRemotePeerInfo + some(pubsub_topics.LEGACY_ENCODING), message8, serverRemotePeerInfo ) # Then the message is received by the server @@ -187,7 +188,7 @@ suite "Waku Lightpush Client": # When publishing a valid payload handlerFuture = newPushHandlerFuture() let publishResponse9 = - await client.publish(pubsubTopic, message9, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message9, serverRemotePeerInfo) # Then the message is received by the server discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) @@ -221,7 +222,7 @@ suite "Waku Lightpush Client": # When publishing the 1KiB payload let publishResponse1 = - await client.publish(pubsubTopic, message1, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message1, serverRemotePeerInfo) # Then the message is received by the server assertResultOk publishResponse1 @@ -230,7 +231,7 @@ suite "Waku Lightpush Client": # When publishing the 10KiB payload handlerFuture = newPushHandlerFuture() let publishResponse2 = - await client.publish(pubsubTopic, message2, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message2, serverRemotePeerInfo) # Then the message is received by the server assertResultOk publishResponse2 @@ -239,7 +240,7 @@ suite "Waku Lightpush Client": # When publishing the 100KiB payload handlerFuture = newPushHandlerFuture() let publishResponse3 = - await client.publish(pubsubTopic, message3, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message3, serverRemotePeerInfo) # Then the message is received by the server assertResultOk publishResponse3 @@ -248,7 +249,7 @@ suite "Waku Lightpush Client": # When publishing the 1MiB + 63KiB + 911B payload (1113999B) handlerFuture = newPushHandlerFuture() let publishResponse4 = - await client.publish(pubsubTopic, message4, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message4, serverRemotePeerInfo) # Then the message is received by the server assertResultOk publishResponse4 @@ -257,11 +258,12 @@ suite "Waku Lightpush Client": # When publishing the 1MiB + 63KiB + 912B payload (1114000B) handlerFuture = newPushHandlerFuture() let publishResponse5 = - await client.publish(pubsubTopic, message5, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message5, serverRemotePeerInfo) # Then the message is not received by the server check: - not publishResponse5.isOk() + publishResponse5.isErr() + publishResponse5.error.code == PAYLOAD_TOO_LARGE (await handlerFuture.waitForResult()).isErr() asyncTest "Invalid Encoding Payload": @@ -271,16 +273,12 @@ suite "Waku Lightpush Client": # When publishing the payload let publishResponse = await server.handleRequest(clientPeerId, fakeBuffer) - # Then the response is negative - check: - publishResponse.requestId == "" - # And the error is returned - let response = publishResponse.response.get() check: - response.isSuccess == false - response.info.isSome() - scanf(response.info.get(), decodeRpcFailure) + publishResponse.requestId == "N/A" + publishResponse.statusCode == LightpushStatusCode.BAD_REQUEST.uint32 + publishResponse.statusDesc.isSome() + scanf(publishResponse.statusDesc.get(), decodeRpcFailure) asyncTest "Handle Error": # Given a lightpush server that fails @@ -289,9 +287,9 @@ suite "Waku Lightpush Client": handlerFuture2 = newFuture[void]() handler2 = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = + ): Future[WakuLightPushResult] {.async.} = handlerFuture2.complete() - return err(handlerError) + return lighpushErrorResult(PAYLOAD_TOO_LARGE, handlerError) let serverSwitch2 = newTestSwitch() @@ -303,11 +301,12 @@ suite "Waku Lightpush Client": # When publishing a payload let publishResponse = - await client.publish(pubsubTopic, message, serverRemotePeerInfo2) + await client.publish(some(pubsubTopic), message, serverRemotePeerInfo2) # Then the response is negative check: - publishResponse.error() == handlerError + publishResponse.error.code == PAYLOAD_TOO_LARGE + publishResponse.error.desc == some(handlerError) (await handlerFuture2.waitForResult()).isOk() # Cleanup @@ -317,7 +316,7 @@ suite "Waku Lightpush Client": asyncTest "Positive Responses": # When sending a valid PushRequest let publishResponse = - await client.publish(pubsubTopic, message, serverRemotePeerInfo) + await client.publish(some(pubsubTopic), message, serverRemotePeerInfo) # Then the response is positive assertResultOk publishResponse @@ -333,7 +332,8 @@ suite "Waku Lightpush Client": # When sending an invalid PushRequest let publishResponse = - await client.publish(pubsubTopic, message, serverRemotePeerInfo2) + await client.publish(some(pubsubTopic), message, serverRemotePeerInfo2) # Then the response is negative check not publishResponse.isOk() + check publishResponse.error.code == LightpushStatusCode.NO_PEERS_TO_RELAY diff --git a/tests/waku_lightpush/test_ratelimit.nim b/tests/waku_lightpush/test_ratelimit.nim index 148cca3c9..7148be37a 100644 --- a/tests/waku_lightpush/test_ratelimit.nim +++ b/tests/waku_lightpush/test_ratelimit.nim @@ -14,10 +14,7 @@ import waku_core, waku_lightpush, waku_lightpush/client, - waku_lightpush/common, waku_lightpush/protocol_metrics, - waku_lightpush/rpc, - waku_lightpush/rpc_codec, ], ../testlib/[assertions, wakucore, testasync, futures, testutils], ./lightpush_utils, @@ -36,9 +33,9 @@ suite "Rate limited push service": var handlerFuture = newFuture[(string, WakuMessage)]() let handler: PushMessageHandler = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = + ): Future[WakuLightPushResult] {.async.} = handlerFuture.complete((pubsubTopic, message)) - return ok() + return lightpushSuccessResult(1) # succeed to publish to 1 peer. let tokenPeriod = 500.millis @@ -53,12 +50,13 @@ suite "Rate limited push service": handlerFuture = newFuture[(string, WakuMessage)]() let requestRes = - await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId) check await handlerFuture.withTimeout(50.millis) - assert requestRes.isOk(), requestRes.error - check handlerFuture.finished() + check: + requestRes.isOk() + handlerFuture.finished() let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read() @@ -98,9 +96,9 @@ suite "Rate limited push service": var handlerFuture = newFuture[(string, WakuMessage)]() let handler = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = + ): Future[WakuLightPushResult] {.async.} = handlerFuture.complete((pubsubTopic, message)) - return ok() + return lightpushSuccessResult(1) let server = @@ -114,7 +112,7 @@ suite "Rate limited push service": let message = fakeWakuMessage() handlerFuture = newFuture[(string, WakuMessage)]() let requestRes = - await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId) discard await handlerFuture.withTimeout(10.millis) check: @@ -129,12 +127,13 @@ suite "Rate limited push service": let message = fakeWakuMessage() handlerFuture = newFuture[(string, WakuMessage)]() let requestRes = - await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId) discard await handlerFuture.withTimeout(10.millis) check: requestRes.isErr() - requestRes.error == "TOO_MANY_REQUESTS" + requestRes.error.code == TOO_MANY_REQUESTS + requestRes.error.desc == some(TooManyRequestsMessage) for testCnt in 0 .. 2: await successProc() diff --git a/tests/waku_lightpush_legacy/lightpush_utils.nim b/tests/waku_lightpush_legacy/lightpush_utils.nim new file mode 100644 index 000000000..733fbc8b1 --- /dev/null +++ b/tests/waku_lightpush_legacy/lightpush_utils.nim @@ -0,0 +1,29 @@ +{.used.} + +import std/options, chronicles, chronos, libp2p/crypto/crypto + +import + waku/node/peer_manager, + waku/waku_core, + waku/waku_lightpush_legacy, + waku/waku_lightpush_legacy/[client, common], + waku/common/rate_limit/setting, + ../testlib/[common, wakucore] + +proc newTestWakuLegacyLightpushNode*( + switch: Switch, + handler: PushMessageHandler, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), +): Future[WakuLegacyLightPush] {.async.} = + let + peerManager = PeerManager.new(switch) + proto = WakuLegacyLightPush.new(peerManager, rng, handler, rateLimitSetting) + + await proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuLegacyLightpushClient*(switch: Switch): WakuLegacyLightPushClient = + let peerManager = PeerManager.new(switch) + WakuLegacyLightPushClient.new(peerManager, rng) diff --git a/tests/waku_lightpush_legacy/test_all.nim b/tests/waku_lightpush_legacy/test_all.nim new file mode 100644 index 000000000..4e4980929 --- /dev/null +++ b/tests/waku_lightpush_legacy/test_all.nim @@ -0,0 +1 @@ +import ./test_client, ./test_ratelimit diff --git a/tests/waku_lightpush_legacy/test_client.nim b/tests/waku_lightpush_legacy/test_client.nim new file mode 100644 index 000000000..b71b7d5c3 --- /dev/null +++ b/tests/waku_lightpush_legacy/test_client.nim @@ -0,0 +1,339 @@ +{.used.} + +import + std/[options, strscans], + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto + +import + waku/[ + node/peer_manager, + waku_core, + waku_lightpush_legacy, + waku_lightpush_legacy/client, + waku_lightpush_legacy/common, + waku_lightpush_legacy/protocol_metrics, + waku_lightpush_legacy/rpc, + waku_lightpush_legacy/rpc_codec, + ], + ../testlib/[assertions, wakucore, testasync, futures, testutils], + ./lightpush_utils, + ../resources/[pubsub_topics, content_topics, payloads] + +suite "Waku Legacy Lightpush Client": + var + handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + handler {.threadvar.}: PushMessageHandler + + serverSwitch {.threadvar.}: Switch + clientSwitch {.threadvar.}: Switch + server {.threadvar.}: WakuLegacyLightPush + client {.threadvar.}: WakuLegacyLightPushClient + + serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + clientPeerId {.threadvar.}: PeerId + pubsubTopic {.threadvar.}: PubsubTopic + contentTopic {.threadvar.}: ContentTopic + message {.threadvar.}: WakuMessage + + asyncSetup: + handlerFuture = newPushHandlerFuture() + handler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + let msgLen = message.encode().buffer.len + if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024: + return err("length greater than maxMessageSize") + handlerFuture.complete((pubsubTopic, message)) + return ok() + + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + server = await newTestWakuLegacyLightpushNode(serverSwitch, handler) + client = newTestWakuLegacyLightpushClient(clientSwitch) + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + clientPeerId = clientSwitch.peerInfo.peerId + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + message = fakeWakuMessage() + + asyncTeardown: + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + suite "Verification of PushRequest Payload": + asyncTest "Valid Payload Types": + # Given the following payloads + let + message2 = fakeWakuMessage(payloads.ALPHABETIC, content_topics.CURRENT) + message3 = fakeWakuMessage(payloads.ALPHANUMERIC, content_topics.TESTNET) + message4 = fakeWakuMessage(payloads.ALPHANUMERIC_SPECIAL, content_topics.PLAIN) + message5 = fakeWakuMessage(payloads.EMOJI, content_topics.CURRENT) + message6 = fakeWakuMessage(payloads.CODE, content_topics.TESTNET) + message7 = fakeWakuMessage(payloads.QUERY, content_topics.PLAIN) + message8 = fakeWakuMessage(payloads.TEXT_SMALL, content_topics.CURRENT) + message9 = fakeWakuMessage(payloads.TEXT_LARGE, content_topics.TESTNET) + + # When publishing a valid payload + let publishResponse = + await client.publish(pubsubTopic, message, serverRemotePeerInfo) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsubTopic, message) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse2 = + await client.publish(pubsub_topics.CURRENT, message2, serverRemotePeerInfo) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse2 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsub_topics.CURRENT, message2) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse3 = await client.publish( + pubsub_topics.CURRENT_NESTED, message3, serverRemotePeerInfo + ) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse3 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsub_topics.CURRENT_NESTED, message3) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse4 = + await client.publish(pubsub_topics.SHARDING, message4, serverRemotePeerInfo) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse4 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsub_topics.SHARDING, message4) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse5 = + await client.publish(pubsub_topics.PLAIN, message5, serverRemotePeerInfo) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse5 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsub_topics.PLAIN, message5) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse6 = + await client.publish(pubsub_topics.LEGACY, message6, serverRemotePeerInfo) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse6 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsub_topics.LEGACY, message6) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse7 = await client.publish( + pubsub_topics.LEGACY_NESTED, message7, serverRemotePeerInfo + ) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse7 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsub_topics.LEGACY_NESTED, message7) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse8 = await client.publish( + pubsub_topics.LEGACY_ENCODING, message8, serverRemotePeerInfo + ) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse8 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsub_topics.LEGACY_ENCODING, message8) == handlerFuture.read() + + # When publishing a valid payload + handlerFuture = newPushHandlerFuture() + let publishResponse9 = + await client.publish(pubsubTopic, message9, serverRemotePeerInfo) + + # Then the message is received by the server + discard await handlerFuture.withTimeout(FUTURE_TIMEOUT) + assertResultOk publishResponse9 + check handlerFuture.finished() + + # And the message is received with the correct topic and payload + check (pubsubTopic, message9) == handlerFuture.read() + + asyncTest "Valid Payload Sizes": + # Given some valid payloads + let + overheadBytes: uint64 = 112 + message1 = + fakeWakuMessage(contentTopic = contentTopic, payload = getByteSequence(1024)) + # 1KiB + message2 = fakeWakuMessage( + contentTopic = contentTopic, payload = getByteSequence(10 * 1024) + ) # 10KiB + message3 = fakeWakuMessage( + contentTopic = contentTopic, payload = getByteSequence(100 * 1024) + ) # 100KiB + message4 = fakeWakuMessage( + contentTopic = contentTopic, + payload = getByteSequence(DefaultMaxWakuMessageSize - overheadBytes - 1), + ) # Inclusive Limit + message5 = fakeWakuMessage( + contentTopic = contentTopic, + payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024), + ) # Exclusive Limit + + # When publishing the 1KiB payload + let publishResponse1 = + await client.publish(pubsubTopic, message1, serverRemotePeerInfo) + + # Then the message is received by the server + assertResultOk publishResponse1 + check (pubsubTopic, message1) == (await handlerFuture.waitForResult()).value() + + # When publishing the 10KiB payload + handlerFuture = newPushHandlerFuture() + let publishResponse2 = + await client.publish(pubsubTopic, message2, serverRemotePeerInfo) + + # Then the message is received by the server + assertResultOk publishResponse2 + check (pubsubTopic, message2) == (await handlerFuture.waitForResult()).value() + + # When publishing the 100KiB payload + handlerFuture = newPushHandlerFuture() + let publishResponse3 = + await client.publish(pubsubTopic, message3, serverRemotePeerInfo) + + # Then the message is received by the server + assertResultOk publishResponse3 + check (pubsubTopic, message3) == (await handlerFuture.waitForResult()).value() + + # When publishing the 1MiB + 63KiB + 911B payload (1113999B) + handlerFuture = newPushHandlerFuture() + let publishResponse4 = + await client.publish(pubsubTopic, message4, serverRemotePeerInfo) + + # Then the message is received by the server + assertResultOk publishResponse4 + check (pubsubTopic, message4) == (await handlerFuture.waitForResult()).value() + + # When publishing the 1MiB + 63KiB + 912B payload (1114000B) + handlerFuture = newPushHandlerFuture() + let publishResponse5 = + await client.publish(pubsubTopic, message5, serverRemotePeerInfo) + + # Then the message is not received by the server + check: + not publishResponse5.isOk() + (await handlerFuture.waitForResult()).isErr() + + asyncTest "Invalid Encoding Payload": + # Given a payload with an invalid encoding + let fakeBuffer = @[byte(42)] + + # When publishing the payload + let publishResponse = await server.handleRequest(clientPeerId, fakeBuffer) + + # Then the response is negative + check: + publishResponse.requestId == "" + + # And the error is returned + let response = publishResponse.response.get() + check: + response.isSuccess == false + response.info.isSome() + scanf(response.info.get(), decodeRpcFailure) + + asyncTest "Handle Error": + # Given a lightpush server that fails + let + handlerError = "handler-error" + handlerFuture2 = newFuture[void]() + handler2 = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture2.complete() + return err(handlerError) + + let + serverSwitch2 = newTestSwitch() + server2 = await newTestWakuLegacyLightpushNode(serverSwitch2, handler2) + + await serverSwitch2.start() + + let serverRemotePeerInfo2 = serverSwitch2.peerInfo.toRemotePeerInfo() + + # When publishing a payload + let publishResponse = + await client.publish(pubsubTopic, message, serverRemotePeerInfo2) + + # Then the response is negative + check: + publishResponse.error() == handlerError + (await handlerFuture2.waitForResult()).isOk() + + # Cleanup + await serverSwitch2.stop() + + suite "Verification of PushResponse Payload": + asyncTest "Positive Responses": + # When sending a valid PushRequest + let publishResponse = + await client.publish(pubsubTopic, message, serverRemotePeerInfo) + + # Then the response is positive + assertResultOk publishResponse + + # TODO: Improve: Add more negative responses variations + asyncTest "Negative Responses": + # Given a server that does not support Waku Lightpush + let + serverSwitch2 = newTestSwitch() + serverRemotePeerInfo2 = serverSwitch2.peerInfo.toRemotePeerInfo() + + await serverSwitch2.start() + + # When sending an invalid PushRequest + let publishResponse = + await client.publish(pubsubTopic, message, serverRemotePeerInfo2) + + # Then the response is negative + check not publishResponse.isOk() diff --git a/tests/waku_lightpush_legacy/test_ratelimit.nim b/tests/waku_lightpush_legacy/test_ratelimit.nim new file mode 100644 index 000000000..1d033302f --- /dev/null +++ b/tests/waku_lightpush_legacy/test_ratelimit.nim @@ -0,0 +1,153 @@ +{.used.} + +import + std/[options, strscans], + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto + +import + waku/[ + node/peer_manager, + common/rate_limit/setting, + waku_core, + waku_lightpush_legacy, + waku_lightpush_legacy/client, + waku_lightpush_legacy/common, + waku_lightpush_legacy/protocol_metrics, + waku_lightpush_legacy/rpc, + waku_lightpush_legacy/rpc_codec, + ], + ../testlib/[assertions, wakucore, testasync, futures, testutils], + ./lightpush_utils, + ../resources/[pubsub_topics, content_topics, payloads] + +suite "Rate limited push service": + asyncTest "push message with rate limit not violated": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + var handlerFuture = newFuture[(string, WakuMessage)]() + let handler: PushMessageHandler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + tokenPeriod = 500.millis + server = await newTestWakuLegacyLightpushNode( + serverSwitch, handler, some((3, tokenPeriod)) + ) + client = newTestWakuLegacyLightpushClient(clientSwitch) + + let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo() + + let sendMsgProc = proc(): Future[void] {.async.} = + let message = fakeWakuMessage() + + handlerFuture = newFuture[(string, WakuMessage)]() + let requestRes = + await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + + check await handlerFuture.withTimeout(50.millis) + + assert requestRes.isOk(), requestRes.error + check handlerFuture.finished() + + let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read() + + check: + handledMessagePubsubTopic == DefaultPubsubTopic + handledMessage == message + + let waitInBetweenFor = 20.millis + + # Test cannot be too explicit about the time when the TokenBucket resets + # the internal timer, although in normal use there is no use case to care about it. + var firstWaitExtend = 300.millis + + for runCnt in 0 ..< 3: + let startTime = Moment.now() + for testCnt in 0 ..< 3: + await sendMsgProc() + await sleepAsync(20.millis) + + var endTime = Moment.now() + var elapsed: Duration = (endTime - startTime) + await sleepAsync(tokenPeriod - elapsed + firstWaitExtend) + firstWaitEXtend = 100.millis + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "push message with rate limit reject": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + var handlerFuture = newFuture[(string, WakuMessage)]() + let handler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + server = await newTestWakuLegacyLightpushNode( + serverSwitch, handler, some((3, 500.millis)) + ) + client = newTestWakuLegacyLightpushClient(clientSwitch) + + let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo() + let topic = DefaultPubsubTopic + + let successProc = proc(): Future[void] {.async.} = + let message = fakeWakuMessage() + handlerFuture = newFuture[(string, WakuMessage)]() + let requestRes = + await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + discard await handlerFuture.withTimeout(10.millis) + + check: + requestRes.isOk() + handlerFuture.finished() + let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read() + check: + handledMessagePubsubTopic == DefaultPubsubTopic + handledMessage == message + + let rejectProc = proc(): Future[void] {.async.} = + let message = fakeWakuMessage() + handlerFuture = newFuture[(string, WakuMessage)]() + let requestRes = + await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + discard await handlerFuture.withTimeout(10.millis) + + check: + requestRes.isErr() + requestRes.error == "TOO_MANY_REQUESTS" + + for testCnt in 0 .. 2: + await successProc() + await sleepAsync(20.millis) + + await rejectProc() + + await sleepAsync(500.millis) + + ## next one shall succeed due to the rate limit time window has passed + await successProc() + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) diff --git a/tests/wakunode_rest/test_all.nim b/tests/wakunode_rest/test_all.nim index 9c3de0f13..6e34b6fdd 100644 --- a/tests/wakunode_rest/test_all.nim +++ b/tests/wakunode_rest/test_all.nim @@ -4,6 +4,7 @@ import ./test_rest_debug_serdes, ./test_rest_debug, ./test_rest_filter, + ./test_rest_lightpush_legacy, ./test_rest_health, ./test_rest_relay_serdes, ./test_rest_relay, diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush_legacy.nim similarity index 94% rename from tests/wakunode_rest/test_rest_lightpush.nim rename to tests/wakunode_rest/test_rest_lightpush_legacy.nim index 2ff0bf26a..3490a5f80 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush_legacy.nim @@ -15,13 +15,13 @@ import waku_core, waku_node, node/peer_manager, - waku_lightpush/common, + waku_lightpush_legacy/common, waku_api/rest/server, waku_api/rest/client, waku_api/rest/responses, - waku_api/rest/lightpush/types, - waku_api/rest/lightpush/handlers as lightpush_api, - waku_api/rest/lightpush/client as lightpush_api_client, + waku_api/rest/legacy_lightpush/types, + waku_api/rest/legacy_lightpush/handlers as lightpush_api, + waku_api/rest/legacy_lightpush/client as lightpush_api_client, waku_relay, common/rate_limit/setting, ], @@ -61,8 +61,8 @@ proc init( await testSetup.consumerNode.mountRelay() await testSetup.serviceNode.mountRelay() - await testSetup.serviceNode.mountLightPush(rateLimit) - testSetup.pushNode.mountLightPushClient() + await testSetup.serviceNode.mountLegacyLightPush(rateLimit) + testSetup.pushNode.mountLegacyLightPushClient() testSetup.serviceNode.peerManager.addServicePeer( testSetup.consumerNode.peerInfo.toRemotePeerInfo(), WakuRelayCodec @@ -73,7 +73,7 @@ proc init( ) testSetup.pushNode.peerManager.addServicePeer( - testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec + testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLegacyLightPushCodec ) var restPort = Port(0) @@ -209,8 +209,7 @@ suite "Waku v2 Rest API - lightpush": await restLightPushTest.shutdown() - # disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500 - xasyncTest "Request rate limit push message": + asyncTest "Request rate limit push message": # Given let budgetCap = 3 let tokenPeriod = 500.millis @@ -273,7 +272,7 @@ suite "Waku v2 Rest API - lightpush": let endTime = Moment.now() let elapsed: Duration = (endTime - startTime) - await sleepAsync(tokenPeriod - elapsed) + await sleepAsync(tokenPeriod - elapsed + 10.millis) await restLightPushTest.shutdown() diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index a4f0a638e..78a434405 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit a4f0a638e718f05ecec01ae3a6ad2838714e7e40 +Subproject commit 78a434405435b69a24e8b263d48d622d57c4db5b diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 625f1a77b..d2d6b1d99 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -16,6 +16,7 @@ import ../waku_enr/sharding, ../waku_node, ../waku_core, + ../waku_core/codecs, ../waku_rln_relay, ../discovery/waku_dnsdisc, ../waku_archive/retention_policy as policy, @@ -33,7 +34,7 @@ import ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, - ../waku_lightpush/common, + ../waku_lightpush_legacy/common, ../common/utils/parse_size_units, ../common/rate_limit/setting, ../common/databases/dburl @@ -359,14 +360,17 @@ proc setupProtocols( if conf.lightpush: try: await mountLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH)) + await mountLegacyLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH)) except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) mountLightPushClient(node) + mountLegacyLightPushClient(node) if conf.lightpushnode != "": let lightPushNode = parsePeerInfo(conf.lightpushnode) if lightPushNode.isOk(): node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) + node.peerManager.addServicePeer(lightPushNode.value, WakuLegacyLightPushCodec) else: return err("failed to set node waku lightpush peer: " & lightPushNode.error) diff --git a/waku/incentivization/reputation_manager.nim b/waku/incentivization/reputation_manager.nim index d5097b711..3177c0fdf 100644 --- a/waku/incentivization/reputation_manager.nim +++ b/waku/incentivization/reputation_manager.nim @@ -1,5 +1,5 @@ import tables, std/options -import waku/waku_lightpush/rpc +import ../waku_lightpush_legacy/rpc type PeerId = string diff --git a/waku/node/delivery_monitor/delivery_monitor.nim b/waku/node/delivery_monitor/delivery_monitor.nim index 28f9e2507..4dda542cc 100644 --- a/waku/node/delivery_monitor/delivery_monitor.nim +++ b/waku/node/delivery_monitor/delivery_monitor.nim @@ -20,7 +20,7 @@ proc new*( T: type DeliveryMonitor, storeClient: WakuStoreClient, wakuRelay: protocol.WakuRelay, - wakuLightpushClient: WakuLightPushClient, + wakuLightpushClient: WakuLightpushClient, wakuFilterClient: WakuFilterClient, ): Result[T, string] = ## storeClient is needed to give store visitility to DeliveryMonitor diff --git a/waku/node/delivery_monitor/send_monitor.nim b/waku/node/delivery_monitor/send_monitor.nim index 8a67e46b1..adc9f03bd 100644 --- a/waku/node/delivery_monitor/send_monitor.nim +++ b/waku/node/delivery_monitor/send_monitor.nim @@ -171,9 +171,9 @@ proc processMessages(self: SendMonitor) {.async.} = let msg = deliveryInfo.msg if not self.wakuRelay.isNil(): debug "trying to publish again with wakuRelay", msgHash, pubsubTopic - let ret = await self.wakuRelay.publish(pubsubTopic, msg) - if ret == 0: - error "could not publish with wakuRelay.publish", msgHash, pubsubTopic + (await self.wakuRelay.publish(pubsubTopic, msg)).isOkOr: + error "could not publish with wakuRelay.publish", + msgHash, pubsubTopic, error = $error continue if not self.wakuLightpushClient.isNil(): diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 7ce23914d..e10d705ff 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -20,7 +20,8 @@ import libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, - libp2p/transports/wstransport + libp2p/transports/wstransport, + libp2p/utility import ../waku_core, ../waku_core/topics/sharding, @@ -40,11 +41,10 @@ import ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, ../waku_rendezvous/protocol, - ../waku_lightpush/client as lightpush_client, - ../waku_lightpush/common, - ../waku_lightpush/protocol, - ../waku_lightpush/self_req_handler, - ../waku_lightpush/callbacks, + ../waku_lightpush_legacy/client as legacy_ligntpuhs_client, + ../waku_lightpush_legacy as legacy_lightpush_protocol, + ../waku_lightpush/client as ligntpuhs_client, + ../waku_lightpush as lightpush_protocol, ../waku_enr, ../waku_peer_exchange, ../waku_rln_relay, @@ -105,6 +105,8 @@ type wakuFilter*: waku_filter_v2.WakuFilter wakuFilterClient*: filter_client.WakuFilterClient wakuRlnRelay*: WakuRLNRelay + wakuLegacyLightPush*: WakuLegacyLightPush + wakuLegacyLightpushClient*: WakuLegacyLightPushClient wakuLightPush*: WakuLightPush wakuLightpushClient*: WakuLightPushClient wakuPeerExchange*: WakuPeerExchange @@ -250,7 +252,6 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) = return proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - let msg_hash = topic.computeMessageHash(msg).to0xHex() let msgSizeKB = msg.payload.len / 1000 waku_node_messages.inc(labelValues = ["relay"]) @@ -979,53 +980,53 @@ proc setupStoreResume*(node: WakuNode) = return ## Waku lightpush - -proc mountLightPush*( +proc mountLegacyLightPush*( node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit ) {.async.} = - info "mounting light push" + info "mounting legacy light push" - var pushHandler = + let pushHandler = if node.wakuRelay.isNil: - debug "mounting lightpush without relay (nil)" - getNilPushHandler() + debug "mounting legacy lightpush without relay (nil)" + legacy_lightpush_protocol.getNilPushHandler() else: - debug "mounting lightpush with relay" + debug "mounting legacy lightpush with relay" let rlnPeer = if isNil(node.wakuRlnRelay): - debug "mounting lightpush without rln-relay" + debug "mounting legacy lightpush without rln-relay" none(WakuRLNRelay) else: - debug "mounting lightpush with rln-relay" + debug "mounting legacy lightpush with rln-relay" some(node.wakuRlnRelay) - getRelayPushHandler(node.wakuRelay, rlnPeer) + legacy_lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer) - node.wakuLightPush = - WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit)) + node.wakuLegacyLightPush = + WakuLegacyLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit)) if node.started: # Node has started already. Let's start lightpush too. - await node.wakuLightPush.start() + await node.wakuLegacyLightPush.start() - node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) + node.switch.mount(node.wakuLegacyLightPush, protocolMatcher(WakuLegacyLightPushCodec)) -proc mountLightPushClient*(node: WakuNode) = - info "mounting light push client" +proc mountLegacyLightPushClient*(node: WakuNode) = + info "mounting legacy light push client" - node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) + node.wakuLegacyLightpushClient = + WakuLegacyLightPushClient.new(node.peerManager, node.rng) -proc lightpushPublish*( +proc legacyLightpushPublish*( node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage, peer: RemotePeerInfo, -): Future[WakuLightPushResult[string]] {.async, gcsafe.} = +): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} = ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. ## Returns whether relaying was successful or not. ## `WakuMessage` should contain a `contentTopic` field for light node ## functionality. - if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): - error "failed to publish message as lightpush not available" + if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil(): + error "failed to publish message as legacy lightpush not available" return err("Waku lightpush not available") let internalPublish = proc( @@ -1033,23 +1034,24 @@ proc lightpushPublish*( pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo, - ): Future[WakuLightPushResult[string]] {.async, gcsafe.} = + ): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} = let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() - if not node.wakuLightpushClient.isNil(): - notice "publishing message with lightpush", + if not node.wakuLegacyLightpushClient.isNil(): + notice "publishing message with legacy lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, msg_hash = msgHash - return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) + return await node.wakuLegacyLightpushClient.publish(pubsubTopic, message, peer) - if not node.wakuLightPush.isNil(): - notice "publishing message with self hosted lightpush", + if not node.wakuLegacyLightPush.isNil(): + notice "publishing message with self hosted legacy lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, msg_hash = msgHash - return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message) + return + await node.wakuLegacyLightPush.handleSelfLightPushRequest(pubsubTopic, message) try: if pubsubTopic.isSome(): return await internalPublish(node, pubsubTopic.get(), message, peer) @@ -1068,26 +1070,119 @@ proc lightpushPublish*( return err(getCurrentExceptionMsg()) # TODO: Move to application module (e.g., wakunode2.nim) -proc lightpushPublish*( +proc legacyLightpushPublish*( node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage -): Future[WakuLightPushResult[string]] {. - async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead" +): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {. + async, gcsafe, deprecated: "Use 'node.legacyLightpushPublish()' instead" .} = - if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): - error "failed to publish message as lightpush not available" - return err("waku lightpush not available") + if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil(): + error "failed to publish message as legacy lightpush not available" + return err("waku legacy lightpush not available") var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo) - if not node.wakuLightpushClient.isNil(): - peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) + if not node.wakuLegacyLightpushClient.isNil(): + peerOpt = node.peerManager.selectPeer(WakuLegacyLightPushCodec) if peerOpt.isNone(): let msg = "no suitable remote peers" error "failed to publish message", err = msg return err(msg) - elif not node.wakuLightPush.isNil(): + elif not node.wakuLegacyLightPush.isNil(): peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId)) - return await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get()) + return await node.legacyLightpushPublish(pubsubTopic, message, peer = peerOpt.get()) + +proc mountLightPush*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async.} = + info "mounting light push" + + let pushHandler = + if node.wakuRelay.isNil(): + debug "mounting lightpush v2 without relay (nil)" + lightpush_protocol.getNilPushHandler() + else: + debug "mounting lightpush with relay" + let rlnPeer = + if isNil(node.wakuRlnRelay): + debug "mounting lightpush without rln-relay" + none(WakuRLNRelay) + else: + debug "mounting lightpush with rln-relay" + some(node.wakuRlnRelay) + lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer) + + node.wakuLightPush = WakuLightPush.new( + node.peerManager, node.rng, pushHandler, node.wakuSharding, some(rateLimit) + ) + + if node.started: + # Node has started already. Let's start lightpush too. + await node.wakuLightPush.start() + + node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) + +proc mountLightPushClient*(node: WakuNode) = + info "mounting light push client" + + node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) + +proc lightpushPublishHandler( + node: WakuNode, + pubsubTopic: PubsubTopic, + message: WakuMessage, + peer: RemotePeerInfo | PeerInfo, +): Future[lightpush_protocol.WakuLightPushResult] {.async.} = + let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() + if not node.wakuLightpushClient.isNil(): + notice "publishing message with legacy lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msgHash + return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) + + if not node.wakuLightPush.isNil(): + notice "publishing message with self hosted legacy lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msgHash + return + await node.wakuLightPush.handleSelfLightPushRequest(some(pubsubTopic), message) + +proc lightpushPublish*( + node: WakuNode, + pubsubTopic: Option[PubsubTopic], + message: WakuMessage, + peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo), +): Future[lightpush_protocol.WakuLightPushResult] {.async.} = + if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): + error "failed to publish message as lightpush not available" + return lighpushErrorResult(SERVICE_NOT_AVAILABLE, "Waku lightpush not available") + + let toPeer: RemotePeerInfo = peerOpt.valueOr: + if not node.wakuLightPush.isNil(): + RemotePeerInfo.init(node.peerId()) + elif not node.wakuLightpushClient.isNil(): + node.peerManager.selectPeer(WakuLightPushCodec).valueOr: + let msg = "no suitable remote peers" + error "failed to publish message", msg = msg + return lighpushErrorResult(NO_PEERS_TO_RELAY, msg) + else: + return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers") + + let pubsubForPublish = pubSubTopic.valueOr: + let parsedTopic = NsContentTopic.parse(message.contentTopic).valueOr: + let msg = "Invalid content-topic:" & $error + error "lightpush request handling error", error = msg + return lighpushErrorResult(INVALID_MESSAGE_ERROR, msg) + + node.wakuSharding.getShard(parsedTopic).valueOr: + let msg = "Autosharding error: " & error + error "lightpush publish error", error = msg + return lighpushErrorResult(INTERNAL_SERVER_ERROR, msg) + + return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer) ## Waku RLN Relay proc mountRlnRelay*( diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 1e24ad56d..c140c46d6 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -12,7 +12,7 @@ import ../../../waku_store_legacy/common, ../../../waku_store/common, ../../../waku_filter_v2, - ../../../waku_lightpush/common, + ../../../waku_lightpush_legacy/common, ../../../waku_relay, ../../../waku_peer_exchange, ../../../waku_node, @@ -85,6 +85,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, legacyStorePeers) + let legacyLightpushPeers = node.peerManager.wakuPeerStore + .peers(WakuLegacyLightPushCodec) + .mapIt( + ( + multiaddr: constructMultiaddrStr(it), + protocol: WakuLegacyLightPushCodec, + connected: it.connectedness == Connectedness.Connected, + origin: it.origin, + ) + ) + tuplesToWakuPeers(peers, legacyLightpushPeers) + let lightpushPeers = node.peerManager.wakuPeerStore.peers(WakuLightPushCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 325dcce06..6fe1b5e2b 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -12,6 +12,7 @@ import waku/waku_api/rest/debug/handlers as rest_debug_api, waku/waku_api/rest/relay/handlers as rest_relay_api, waku/waku_api/rest/filter/handlers as rest_filter_api, + waku/waku_api/rest/legacy_lightpush/handlers as rest_legacy_lightpush_api, waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, waku/waku_api/rest/store/handlers as rest_store_api, waku/waku_api/rest/legacy_store/handlers as rest_store_legacy_api, @@ -176,14 +177,17 @@ proc startRestServerProtocolSupport*( ## Light push API ## Install it either if lightpushnode (lightpush service node) is configured and client is mounted) ## or install it to be used with self-hosted lightpush service - if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or - (conf.lightpush and node.wakuLightPush != nil and node.wakuRelay != nil): + if (conf.lightpushnode != "" and node.wakuLegacyLightpushClient != nil) or + (conf.lightpush and node.wakuLegacyLightPush != nil and node.wakuRelay != nil): let lightDiscoHandler = if not wakuDiscv5.isNil(): some(defaultDiscoveryHandler(wakuDiscv5, Lightpush)) else: none(DiscoveryHandler) + rest_legacy_lightpush_api.installLightPushRequestHandler( + router, node, lightDiscoHandler + ) rest_lightpush_api.installLightPushRequestHandler(router, node, lightDiscoHandler) else: restServerNotInstalledTab["lightpush"] = diff --git a/waku/waku_api/rest/legacy_lightpush/client.nim b/waku/waku_api/rest/legacy_lightpush/client.nim new file mode 100644 index 000000000..f0932e99f --- /dev/null +++ b/waku/waku_api/rest/legacy_lightpush/client.nim @@ -0,0 +1,23 @@ +{.push raises: [].} + +import + json, + std/sets, + stew/byteutils, + strformat, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import ../../../waku_core, ../serdes, ../responses, ../rest_serdes, ./types + +export types + +proc encodeBytes*(value: PushRequest, contentType: string): RestResult[seq[byte]] = + return encodeBytesOf(value, contentType) + +proc sendPushRequest*( + body: PushRequest +): RestResponse[string] {. + rest, endpoint: "/lightpush/v1/message", meth: HttpMethod.MethodPost +.} diff --git a/waku/waku_api/rest/legacy_lightpush/handlers.nim b/waku/waku_api/rest/legacy_lightpush/handlers.nim new file mode 100644 index 000000000..5d7c66bb1 --- /dev/null +++ b/waku/waku_api/rest/legacy_lightpush/handlers.nim @@ -0,0 +1,91 @@ +{.push raises: [].} + +import + std/strformat, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/route, + presto/common + +import + waku/node/peer_manager, + waku/waku_lightpush_legacy/common, + ../../../waku_node, + ../../handlers, + ../serdes, + ../responses, + ../rest_serdes, + ./types + +export types + +logScope: + topics = "waku node rest legacy lightpush api" + +const FutTimeoutForPushRequestProcessing* = 5.seconds + +const NoPeerNoDiscoError = + RestApiResponse.serviceUnavailable("No suitable service peer & no discovery method") + +const NoPeerNoneFoundError = + RestApiResponse.serviceUnavailable("No suitable service peer & none discovered") + +proc useSelfHostedLightPush(node: WakuNode): bool = + return node.wakuLegacyLightPush != nil and node.wakuLegacyLightPushClient == nil + +#### Request handlers + +const ROUTE_LIGHTPUSH = "/lightpush/v1/message" + +proc installLightPushRequestHandler*( + router: var RestRouter, + node: WakuNode, + discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler), +) = + router.api(MethodPost, ROUTE_LIGHTPUSH) do( + contentBody: Option[ContentBody] + ) -> RestApiResponse: + ## Send a request to push a waku message + debug "post", ROUTE_LIGHTPUSH, contentBody + + let decodedBody = decodeRequestBody[PushRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error() + + let req: PushRequest = decodedBody.value() + + let msg = req.message.toWakuMessage().valueOr: + return RestApiResponse.badRequest("Invalid message: " & $error) + + var peer = RemotePeerInfo.init($node.switch.peerInfo.peerId) + if useSelfHostedLightPush(node): + discard + else: + peer = node.peerManager.selectPeer(WakuLegacyLightPushCodec).valueOr: + let handler = discHandler.valueOr: + return NoPeerNoDiscoError + + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError("No value in peerOp: " & $error) + + peerOp.valueOr: + return NoPeerNoneFoundError + + let subFut = node.legacyLightpushPublish(req.pubsubTopic, msg, peer) + + if not await subFut.withTimeout(FutTimeoutForPushRequestProcessing): + error "Failed to request a message push due to timeout!" + return RestApiResponse.serviceUnavailable("Push request timed out") + + if subFut.value().isErr(): + if subFut.value().error == TooManyRequestsMessage: + return RestApiResponse.tooManyRequests("Request rate limmit reached") + + return RestApiResponse.serviceUnavailable( + fmt("Failed to request a message push: {subFut.value().error}") + ) + + return RestApiResponse.ok() diff --git a/waku/waku_api/rest/legacy_lightpush/types.nim b/waku/waku_api/rest/legacy_lightpush/types.nim new file mode 100644 index 000000000..60368403f --- /dev/null +++ b/waku/waku_api/rest/legacy_lightpush/types.nim @@ -0,0 +1,67 @@ +{.push raises: [].} + +import + std/[sets, strformat], + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client] + +import ../../../waku_core, ../relay/types as relay_types, ../serdes + +export relay_types + +#### Types + +type PushRequest* = object + pubsubTopic*: Option[PubSubTopic] + message*: RelayWakuMessage + +#### Serialization and deserialization + +proc writeValue*( + writer: var JsonWriter[RestJson], value: PushRequest +) {.raises: [IOError].} = + writer.beginRecord() + if value.pubsubTopic.isSome(): + writer.writeField("pubsubTopic", value.pubsubTopic.get()) + writer.writeField("message", value.message) + writer.endRecord() + +proc readValue*( + reader: var JsonReader[RestJson], value: var PushRequest +) {.raises: [SerializationError, IOError].} = + var + pubsubTopic = none(PubsubTopic) + message = none(RelayWakuMessage) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = + try: + fmt"Multiple `{fieldName}` fields found" + except CatchableError: + "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "PushRequest") + + case fieldName + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "message": + message = some(reader.readValue(RelayWakuMessage)) + else: + unrecognizedFieldWarning(value) + + if message.isNone(): + reader.raiseUnexpectedValue("Field `message` is missing") + + value = PushRequest( + pubsubTopic: + if pubsubTopic.isNone() or pubsubTopic.get() == "": + none(string) + else: + some(pubsubTopic.get()), + message: message.get(), + ) diff --git a/waku/waku_api/rest/lightpush/client.nim b/waku/waku_api/rest/lightpush/client.nim index 3e7f85524..abf832a3c 100644 --- a/waku/waku_api/rest/lightpush/client.nim +++ b/waku/waku_api/rest/lightpush/client.nim @@ -13,14 +13,11 @@ import ../../../waku_core, ../serdes, ../responses, ../rest_serdes, ./types export types -logScope: - topics = "waku node rest client v2" - proc encodeBytes*(value: PushRequest, contentType: string): RestResult[seq[byte]] = return encodeBytesOf(value, contentType) proc sendPushRequest*( body: PushRequest -): RestResponse[string] {. - rest, endpoint: "/lightpush/v1/message", meth: HttpMethod.MethodPost +): RestResponse[PushResponse] {. + rest, endpoint: "/lightpush/v3/message", meth: HttpMethod.MethodPost .} diff --git a/waku/waku_api/rest/lightpush/handlers.nim b/waku/waku_api/rest/lightpush/handlers.nim index 6003c8a59..cbb94e16e 100644 --- a/waku/waku_api/rest/lightpush/handlers.nim +++ b/waku/waku_api/rest/lightpush/handlers.nim @@ -24,7 +24,7 @@ export types logScope: topics = "waku node rest lightpush api" -const futTimeoutForPushRequestProcessing* = 5.seconds +const FutTimeoutForPushRequestProcessing* = 5.seconds const NoPeerNoDiscoError = RestApiResponse.serviceUnavailable("No suitable service peer & no discovery method") @@ -33,11 +33,32 @@ const NoPeerNoneFoundError = RestApiResponse.serviceUnavailable("No suitable service peer & none discovered") proc useSelfHostedLightPush(node: WakuNode): bool = - return node.wakuLightPush != nil and node.wakuLightPushClient == nil + return node.wakuLegacyLightPush != nil and node.wakuLegacyLightPushClient == nil + +proc convertErrorKindToHttpStatus(statusCode: LightpushStatusCode): HttpCode = + ## Lightpush status codes are matching HTTP status codes by design + return HttpCode(statusCode.int32) + +proc makeRestResponse(response: WakuLightPushResult): RestApiResponse = + var httpStatus: HttpCode = Http200 + var apiResponse: PushResponse + + if response.isOk(): + apiResponse.relayPeerCount = some(response.get()) + else: + httpStatus = convertErrorKindToHttpStatus(response.error().code) + apiResponse.statusDesc = response.error().desc + + let restResp = RestApiResponse.jsonResponse(apiResponse, status = httpStatus).valueOr: + error "An error ocurred while building the json respose: ", error = error + return RestApiResponse.internalServerError( + fmt("An error ocurred while building the json respose: {error}") + ) + + return restResp #### Request handlers - -const ROUTE_LIGHTPUSH* = "/lightpush/v1/message" +const ROUTE_LIGHTPUSH = "/lightpush/v3/message" proc installLightPushRequestHandler*( router: var RestRouter, @@ -50,21 +71,17 @@ proc installLightPushRequestHandler*( ## Send a request to push a waku message debug "post", ROUTE_LIGHTPUSH, contentBody - let decodedBody = decodeRequestBody[PushRequest](contentBody) - - if decodedBody.isErr(): - return decodedBody.error() - - let req: PushRequest = decodedBody.value() + let req: PushRequest = decodeRequestBody[PushRequest](contentBody).valueOr: + return RestApiResponse.badRequest("Invalid push request: " & $error) let msg = req.message.toWakuMessage().valueOr: return RestApiResponse.badRequest("Invalid message: " & $error) - var peer = RemotePeerInfo.init($node.switch.peerInfo.peerId) + var toPeer = none(RemotePeerInfo) if useSelfHostedLightPush(node): discard else: - peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr: + let aPeer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr: let handler = discHandler.valueOr: return NoPeerNoDiscoError @@ -73,19 +90,12 @@ proc installLightPushRequestHandler*( peerOp.valueOr: return NoPeerNoneFoundError + toPeer = some(aPeer) - let subFut = node.lightpushPublish(req.pubsubTopic, msg, peer) + let subFut = node.lightpushPublish(req.pubsubTopic, msg, toPeer) - if not await subFut.withTimeout(futTimeoutForPushRequestProcessing): + if not await subFut.withTimeout(FutTimeoutForPushRequestProcessing): error "Failed to request a message push due to timeout!" return RestApiResponse.serviceUnavailable("Push request timed out") - if subFut.value().isErr(): - if subFut.value().error == TooManyRequestsMessage: - return RestApiResponse.tooManyRequests("Request rate limmit reached") - - return RestApiResponse.serviceUnavailable( - fmt("Failed to request a message push: {subFut.value().error}") - ) - - return RestApiResponse.ok() + return makeRestResponse(subFut.value()) diff --git a/waku/waku_api/rest/lightpush/types.nim b/waku/waku_api/rest/lightpush/types.nim index 60368403f..1fb87ab45 100644 --- a/waku/waku_api/rest/lightpush/types.nim +++ b/waku/waku_api/rest/lightpush/types.nim @@ -13,12 +13,16 @@ export relay_types #### Types -type PushRequest* = object - pubsubTopic*: Option[PubSubTopic] - message*: RelayWakuMessage +type + PushRequest* = object + pubsubTopic*: Option[PubSubTopic] + message*: RelayWakuMessage + + PushResponse* = object + statusDesc*: Option[string] + relayPeerCount*: Option[uint32] #### Serialization and deserialization - proc writeValue*( writer: var JsonWriter[RestJson], value: PushRequest ) {.raises: [IOError].} = @@ -65,3 +69,46 @@ proc readValue*( some(pubsubTopic.get()), message: message.get(), ) + +proc writeValue*( + writer: var JsonWriter[RestJson], value: PushResponse +) {.raises: [IOError].} = + writer.beginRecord() + if value.statusDesc.isSome(): + writer.writeField("statusDesc", value.statusDesc.get()) + if value.relayPeerCount.isSome(): + writer.writeField("relayPeerCount", value.relayPeerCount.get()) + writer.endRecord() + +proc readValue*( + reader: var JsonReader[RestJson], value: var PushResponse +) {.raises: [SerializationError, IOError].} = + var + statusDesc = none(string) + relayPeerCount = none(uint32) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = + try: + fmt"Multiple `{fieldName}` fields found" + except CatchableError: + "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "PushResponse") + + case fieldName + of "statusDesc": + statusDesc = some(reader.readValue(string)) + of "relayPeerCount": + relayPeerCount = some(reader.readValue(uint32)) + else: + unrecognizedFieldWarning(value) + + if relayPeerCount.isNone() and statusDesc.isNone(): + reader.raiseUnexpectedValue( + "Fields are missing, either `relayPeerCount` or `statusDesc` must be present" + ) + + value = PushResponse(statusDesc: statusDesc, relayPeerCount: relayPeerCount) diff --git a/waku/waku_core.nim b/waku/waku_core.nim index 33021d5a8..44dcce37d 100644 --- a/waku/waku_core.nim +++ b/waku/waku_core.nim @@ -4,6 +4,7 @@ import ./waku_core/message, ./waku_core/peers, ./waku_core/subscription, - ./waku_core/multiaddrstr + ./waku_core/multiaddrstr, + ./waku_core/codecs -export topics, time, message, peers, subscription, multiaddrstr +export topics, time, message, peers, subscription, multiaddrstr, codecs diff --git a/waku/waku_core/codecs.nim b/waku/waku_core/codecs.nim index 35a050b72..32a4af9d8 100644 --- a/waku/waku_core/codecs.nim +++ b/waku/waku_core/codecs.nim @@ -3,7 +3,8 @@ const WakuStoreCodec* = "/vac/waku/store-query/3.0.0" WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1" WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1" - WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" + WakuLightPushCodec* = "/vac/waku/lightpush/3.0.0" + WakuLegacyLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" WakuSyncCodec* = "/vac/waku/sync/1.0.0" WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0" WakuTransferCodec* = "/vac/waku/transfer/1.0.0" diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index a821a0474..fdd3d7948 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -257,7 +257,7 @@ proc parseUrlPeerAddr*( proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] = ## Converts an ENR to dialable RemotePeerInfo - let typedR = ?enr.toTypedRecord() + let typedR = TypedRecord.fromRecord(enr) if not typedR.secp256k1.isSome(): return err("enr: no secp256k1 key in record") @@ -351,12 +351,8 @@ func hasUdpPort*(peer: RemotePeerInfo): bool = let enr = peer.enr.get() - typedEnrRes = enr.toTypedRecord() + typedEnr = TypedRecord.fromRecord(enr) - if typedEnrRes.isErr(): - return false - - let typedEnr = typedEnrRes.get() typedEnr.udp.isSome() or typedEnr.udp6.isSome() proc getAgent*(peer: RemotePeerInfo): string = diff --git a/waku/waku_lightpush.nim b/waku/waku_lightpush.nim index 373478fd9..a90557056 100644 --- a/waku/waku_lightpush.nim +++ b/waku/waku_lightpush.nim @@ -1,3 +1,3 @@ -import ./waku_lightpush/protocol +import ./waku_lightpush/[protocol, common, rpc, rpc_codec, callbacks, self_req_handler] -export protocol +export protocol, common, rpc, rpc_codec, callbacks, self_req_handler diff --git a/waku/waku_lightpush/callbacks.nim b/waku/waku_lightpush/callbacks.nim index 363ebfd3b..d6700412f 100644 --- a/waku/waku_lightpush/callbacks.nim +++ b/waku/waku_lightpush/callbacks.nim @@ -1,5 +1,7 @@ {.push raises: [].} +import stew/results + import ../waku_core, ../waku_relay, @@ -32,28 +34,28 @@ proc checkAndGenerateRLNProof*( proc getNilPushHandler*(): PushMessageHandler = return proc( peer: PeerId, pubsubTopic: string, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = - return err("no waku relay found") + ): Future[WakuLightPushResult] {.async.} = + return lightpushResultInternalError("no waku relay found") proc getRelayPushHandler*( wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() ): PushMessageHandler = return proc( peer: PeerId, pubsubTopic: string, message: WakuMessage - ): Future[WakuLightPushResult[void]] {.async.} = + ): Future[WakuLightPushResult] {.async.} = # append RLN proof - let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message) - if msgWithProof.isErr(): - return err(msgWithProof.error) + let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message).valueOr: + return lighpushErrorResult(OUT_OF_RLN_PROOF, error) - (await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr: - return err(error) + (await wakuRelay.validateMessage(pubSubTopic, msgWithProof)).isOkOr: + return lighpushErrorResult(INVALID_MESSAGE_ERROR, $error) - let publishedCount = await wakuRelay.publish(pubsubTopic, msgWithProof.value) - if publishedCount == 0: - ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 + let publishedResult = await wakuRelay.publish(pubsubTopic, msgWithProof) + + if publishedResult.isErr(): let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - notice "Lightpush request has not been published to any peers", msg_hash = msgHash - return err(protocol_metrics.notPublishedAnyPeer) + notice "Lightpush request has not been published to any peers", + msg_hash = msgHash, reason = $publishedResult.error + return mapPubishingErrorToPushResult(publishedResult.error) - return ok() + return lightpushSuccessResult(publishedResult.get().uint32) diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 3e20bf9e3..7aa2d9fa9 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -30,80 +30,83 @@ proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = wl.publishObservers.add(obs) proc sendPushRequest( - wl: WakuLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo -): Future[WakuLightPushResult[void]] {.async, gcsafe.} = - let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec) - if connOpt.isNone(): - waku_lightpush_errors.inc(labelValues = [dialFailure]) - return err(dialFailure) - let connection = connOpt.get() + wl: WakuLightPushClient, req: LightPushRequest, peer: PeerId | RemotePeerInfo +): Future[WakuLightPushResult] {.async.} = + let connection = (await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr: + waku_lightpush_v3_errors.inc(labelValues = [dialFailure]) + return lighpushErrorResult( + NO_PEERS_TO_RELAY, dialFailure & ": " & $peer & " is not accessible" + ) - let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req)) - await connection.writeLP(rpc.encode().buffer) + await connection.writeLP(req.encode().buffer) var buffer: seq[byte] try: buffer = await connection.readLp(DefaultMaxRpcSize.int) except LPStreamRemoteClosedError: - return err("Exception reading: " & getCurrentExceptionMsg()) + error "Failed to read responose from peer", error = getCurrentExceptionMsg() + return lightpushResultInternalError( + "Failed to read response from peer: " & getCurrentExceptionMsg() + ) - let decodeRespRes = PushRPC.decode(buffer) - if decodeRespRes.isErr(): + let response = LightpushResponse.decode(buffer).valueOr: error "failed to decode response" - waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) - return err(decodeRpcFailure) + waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure]) + return lightpushResultInternalError(decodeRpcFailure) - let pushResponseRes = decodeRespRes.get() - if pushResponseRes.response.isNone(): - waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure]) - return err(emptyResponseBodyFailure) + if response.requestId != req.requestId and + response.statusCode != TOO_MANY_REQUESTS.uint32: + error "response failure, requestId mismatch", + requestId = req.requestId, responseRequestId = response.requestId + return lightpushResultInternalError("response failure, requestId mismatch") - let response = pushResponseRes.response.get() - if not response.isSuccess: - if response.info.isSome(): - return err(response.info.get()) - else: - return err("unknown failure") - - return ok() + return toPushResult(response) proc publish*( wl: WakuLightPushClient, - pubSubTopic: PubsubTopic, + pubSubTopic: Option[PubsubTopic] = none(PubsubTopic), message: WakuMessage, - peer: RemotePeerInfo, -): Future[WakuLightPushResult[string]] {.async, gcsafe.} = - ## On success, returns the msg_hash of the published message - let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex() - let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) - ?await wl.sendPushRequest(pushRequest, peer) + peer: PeerId | RemotePeerInfo, +): Future[WakuLightPushResult] {.async, gcsafe.} = + when peer is PeerId: + info "publish", + peerId = shortLog(peer), + msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex + else: + info "publish", + peerId = shortLog(peer.peerId), + msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex + + let pushRequest = LightpushRequest( + requestId: generateRequestId(wl.rng), pubSubTopic: pubSubTopic, message: message + ) + let publishedCount = ?await wl.sendPushRequest(pushRequest, peer) for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic, message) + obs.onMessagePublished(pubSubTopic.get(""), message) - notice "publishing message with lightpush", - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - target_peer_id = peer.peerId, - msg_hash = msg_hash_hex_str - - return ok(msg_hash_hex_str) + return lightpushSuccessResult(publishedCount) proc publishToAny*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage -): Future[WakuLightPushResult[void]] {.async, gcsafe.} = +): Future[WakuLightPushResult] {.async, gcsafe.} = ## This proc is similar to the publish one but in this case ## we don't specify a particular peer and instead we get it from peer manager info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: - return err("could not retrieve a peer supporting WakuLightPushCodec") + # TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side? + return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers") - let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) - ?await wl.sendPushRequest(pushRequest, peer) + let pushRequest = LightpushRequest( + requestId: generateRequestId(wl.rng), + pubSubTopic: some(pubSubTopic), + message: message, + ) + let publishedCount = ?await wl.sendPushRequest(pushRequest, peer) for obs in wl.publishObservers: obs.onMessagePublished(pubSubTopic, message) - return ok() + return lightpushSuccessResult(publishedCount) diff --git a/waku/waku_lightpush/common.nim b/waku/waku_lightpush/common.nim index cbdec411f..502e23883 100644 --- a/waku/waku_lightpush/common.nim +++ b/waku/waku_lightpush/common.nim @@ -1,15 +1,82 @@ {.push raises: [].} -import results, chronos, libp2p/peerid -import ../waku_core +import std/options, results, chronos, libp2p/peerid +import ../waku_core, ./rpc, ../waku_relay/protocol from ../waku_core/codecs import WakuLightPushCodec export WakuLightPushCodec -type WakuLightPushResult*[T] = Result[T, string] +type LightpushStatusCode* = enum + SUCCESS = uint32(200) + BAD_REQUEST = uint32(400) + PAYLOAD_TOO_LARGE = uint32(413) + INVALID_MESSAGE_ERROR = uint32(420) + UNSUPPORTED_PUBSUB_TOPIC = uint32(421) + TOO_MANY_REQUESTS = uint32(429) + INTERNAL_SERVER_ERROR = uint32(500) + NO_PEERS_TO_RELAY = uint32(503) + OUT_OF_RLN_PROOF = uint32(504) + SERVICE_NOT_AVAILABLE = uint32(505) + +type ErrorStatus* = tuple[code: LightpushStatusCode, desc: Option[string]] +type WakuLightPushResult* = Result[uint32, ErrorStatus] type PushMessageHandler* = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage -): Future[WakuLightPushResult[void]] {.async.} +): Future[WakuLightPushResult] {.async.} -const TooManyRequestsMessage* = "TOO_MANY_REQUESTS" +const TooManyRequestsMessage* = "Request rejected due to too many requests" + +func isSuccess*(response: LightPushResponse): bool = + return response.statusCode == LightpushStatusCode.SUCCESS.uint32 + +func toPushResult*(response: LightPushResponse): WakuLightPushResult = + if isSuccess(response): + return ok(response.relayPeerCount.get(0)) + else: + return err((response.statusCode.LightpushStatusCode, response.statusDesc)) + +func lightpushSuccessResult*(relayPeerCount: uint32): WakuLightPushResult = + return ok(relayPeerCount) + +func lightpushResultInternalError*(msg: string): WakuLightPushResult = + return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, some(msg))) + +func lighpushErrorResult*( + statusCode: LightpushStatusCode, desc: Option[string] +): WakuLightPushResult = + return err((statusCode, desc)) + +func lighpushErrorResult*( + statusCode: LightpushStatusCode, desc: string +): WakuLightPushResult = + return err((statusCode, some(desc))) + +func mapPubishingErrorToPushResult*( + publishOutcome: PublishOutcome +): WakuLightPushResult = + case publishOutcome + of NoTopicSpecified: + return err( + (LightpushStatusCode.INVALID_MESSAGE_ERROR, some("Empty topic, skipping publish")) + ) + of DuplicateMessage: + return err( + (LightpushStatusCode.INVALID_MESSAGE_ERROR, some("Dropping already-seen message")) + ) + of NoPeersToPublish: + return err( + ( + LightpushStatusCode.NO_PEERS_TO_RELAY, + some("No peers for topic, skipping publish"), + ) + ) + of CannotGenerateMessageId: + return err( + ( + LightpushStatusCode.INTERNAL_SERVER_ERROR, + some("Error generating message id, skipping publish"), + ) + ) + else: + return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, none[string]())) diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 2967146db..76a37c6df 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -1,9 +1,17 @@ {.push raises: [].} -import std/options, results, stew/byteutils, chronicles, chronos, metrics, bearssl/rand +import + std/[options, strutils], + results, + stew/byteutils, + chronicles, + chronos, + metrics, + bearssl/rand import ../node/peer_manager/peer_manager, ../waku_core, + ../waku_core/topics/sharding, ./common, ./rpc, ./rpc_codec, @@ -18,55 +26,90 @@ type WakuLightPush* = ref object of LPProtocol peerManager*: PeerManager pushHandler*: PushMessageHandler requestRateLimiter*: RequestRateLimiter + sharding: Sharding proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] -): Future[PushRPC] {.async.} = - let reqDecodeRes = PushRPC.decode(buffer) - var - isSuccess = false - pushResponseInfo = "" - requestId = "" +): Future[LightPushResponse] {.async.} = + let reqDecodeRes = LightpushRequest.decode(buffer) + var isSuccess = false + var pushResponse: LightpushResponse if reqDecodeRes.isErr(): - pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error - elif reqDecodeRes.get().request.isNone(): - pushResponseInfo = emptyRequestBodyFailure + pushResponse = LightpushResponse( + requestId: "N/A", # due to decode failure we don't know requestId + statusCode: LightpushStatusCode.BAD_REQUEST.uint32, + statusDesc: some(decodeRpcFailure & ": " & $reqDecodeRes.error), + ) else: - let pushRpcRequest = reqDecodeRes.get() + let pushRequest = reqDecodeRes.get() - requestId = pushRpcRequest.requestId + let pubsubTopic = pushRequest.pubSubTopic.valueOr: + let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr: + let msg = "Invalid content-topic:" & $error + error "lightpush request handling error", error = msg + return LightpushResponse( + requestId: pushRequest.requestId, + statusCode: LightpushStatusCode.INVALID_MESSAGE_ERROR.uint32, + statusDesc: some(msg), + ) - let - request = pushRpcRequest.request + wl.sharding.getShard(parsedTopic).valueOr: + let msg = "Autosharding error: " & error + error "lightpush request handling error", error = msg + return LightpushResponse( + requestId: pushRequest.requestId, + statusCode: LightpushStatusCode.INTERNAL_SERVER_ERROR.uint32, + statusDesc: some(msg), + ) - pubSubTopic = request.get().pubSubTopic - message = request.get().message + # ensure checking topic will not cause error at gossipsub level + if pubsubTopic.isEmptyOrWhitespace(): + let msg = "topic must not be empty" + error "lightpush request handling error", error = msg + return LightPushResponse( + requestId: pushRequest.requestId, + statusCode: LightpushStatusCode.BAD_REQUEST.uint32, + statusDesc: some(msg), + ) - waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"]) notice "handling lightpush request", my_peer_id = wl.peerManager.switch.peerInfo.peerId, peer_id = peerId, - requestId = requestId, - pubsubTopic = pubsubTopic, - msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(), + requestId = pushRequest.requestId, + pubsubTopic = pushRequest.pubsubTopic, + msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex(), receivedTime = getNowInNanosecondTime() - let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) + let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message) + isSuccess = handleRes.isOk() - pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) + pushResponse = LightpushResponse( + requestId: pushRequest.requestId, + statusCode: + if isSuccess: + LightpushStatusCode.SUCCESS.uint32 + else: + handleRes.error.code.uint32, + statusDesc: + if isSuccess: + none[string]() + else: + handleRes.error.desc, + ) if not isSuccess: - waku_lightpush_errors.inc(labelValues = [pushResponseInfo]) - error "failed to push message", error = pushResponseInfo - let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo)) - let rpc = PushRPC(requestId: requestId, response: some(response)) - return rpc + waku_lightpush_v3_errors.inc( + labelValues = [pushResponse.statusDesc.valueOr("unknown")] + ) + error "failed to push message", error = pushResponse.statusDesc + return pushResponse proc initProtocolHandler(wl: WakuLightPush) = proc handle(conn: Connection, proto: string) {.async.} = - var rpc: PushRPC + var rpc: LightpushResponse wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn): let buffer = await conn.readLp(DefaultMaxRpcSize) @@ -80,13 +123,13 @@ proc initProtocolHandler(wl: WakuLightPush) = peerId = conn.peerId, limit = $wl.requestRateLimiter.setting rpc = static( - PushRPC( + LightpushResponse( ## We will not copy and decode RPC buffer from stream only for requestId ## in reject case as it is comparably too expensive and opens possible ## attack surface requestId: "N/A", - response: - some(PushResponse(isSuccess: false, info: some(TooManyRequestsMessage))), + statusCode: LightpushStatusCode.TOO_MANY_REQUESTS.uint32, + statusDesc: some(TooManyRequestsMessage), ) ) @@ -103,6 +146,7 @@ proc new*( peerManager: PeerManager, rng: ref rand.HmacDrbgContext, pushHandler: PushMessageHandler, + sharding: Sharding, rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = let wl = WakuLightPush( @@ -110,6 +154,7 @@ proc new*( peerManager: peerManager, pushHandler: pushHandler, requestRateLimiter: newRequestRateLimiter(rateLimitSetting), + sharding: sharding, ) wl.initProtocolHandler() setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting) diff --git a/waku/waku_lightpush/protocol_metrics.nim b/waku/waku_lightpush/protocol_metrics.nim index ce48a7d3d..c906cd587 100644 --- a/waku/waku_lightpush/protocol_metrics.nim +++ b/waku/waku_lightpush/protocol_metrics.nim @@ -2,9 +2,9 @@ import metrics -declarePublicGauge waku_lightpush_errors, +declarePublicGauge waku_lightpush_v3_errors, "number of lightpush protocol errors", ["type"] -declarePublicGauge waku_lightpush_messages, +declarePublicGauge waku_lightpush_v3_messages, "number of lightpush messages received", ["type"] # Error types (metric label values) diff --git a/waku/waku_lightpush/rpc.nim b/waku/waku_lightpush/rpc.nim index 33ba3f5e3..5a1a6647d 100644 --- a/waku/waku_lightpush/rpc.nim +++ b/waku/waku_lightpush/rpc.nim @@ -4,15 +4,13 @@ import std/options import ../waku_core type - PushRequest* = object - pubSubTopic*: string + LightpushRequest* = object + requestId*: string + pubSubTopic*: Option[PubsubTopic] message*: WakuMessage - PushResponse* = object - isSuccess*: bool - info*: Option[string] - - PushRPC* = object + LightPushResponse* = object requestId*: string - request*: Option[PushRequest] - response*: Option[PushResponse] + statusCode*: uint32 + statusDesc*: Option[string] + relayPeerCount*: Option[uint32] diff --git a/waku/waku_lightpush/rpc_codec.nim b/waku/waku_lightpush/rpc_codec.nim index 25d2bd210..53bdda4c0 100644 --- a/waku/waku_lightpush/rpc_codec.nim +++ b/waku/waku_lightpush/rpc_codec.nim @@ -5,73 +5,19 @@ import ../common/protobuf, ../waku_core, ./rpc const DefaultMaxRpcSize* = -1 -proc encode*(rpc: PushRequest): ProtoBuffer = - var pb = initProtoBuffer() - - pb.write3(1, rpc.pubSubTopic) - pb.write3(2, rpc.message.encode()) - pb.finish3() - - pb - -proc decode*(T: type PushRequest, buffer: seq[byte]): ProtobufResult[T] = - let pb = initProtoBuffer(buffer) - var rpc = PushRequest() - - var pubSubTopic: PubsubTopic - if not ?pb.getField(1, pubSubTopic): - return err(ProtobufError.missingRequiredField("pubsub_topic")) - else: - rpc.pubSubTopic = pubSubTopic - - var messageBuf: seq[byte] - if not ?pb.getField(2, messageBuf): - return err(ProtobufError.missingRequiredField("message")) - else: - rpc.message = ?WakuMessage.decode(messageBuf) - - ok(rpc) - -proc encode*(rpc: PushResponse): ProtoBuffer = - var pb = initProtoBuffer() - - pb.write3(1, uint64(rpc.isSuccess)) - pb.write3(2, rpc.info) - pb.finish3() - - pb - -proc decode*(T: type PushResponse, buffer: seq[byte]): ProtobufResult[T] = - let pb = initProtoBuffer(buffer) - var rpc = PushResponse() - - var isSuccess: uint64 - if not ?pb.getField(1, isSuccess): - return err(ProtobufError.missingRequiredField("is_success")) - else: - rpc.isSuccess = bool(isSuccess) - - var info: string - if not ?pb.getField(2, info): - rpc.info = none(string) - else: - rpc.info = some(info) - - ok(rpc) - -proc encode*(rpc: PushRPC): ProtoBuffer = +proc encode*(rpc: LightpushRequest): ProtoBuffer = var pb = initProtoBuffer() pb.write3(1, rpc.requestId) - pb.write3(2, rpc.request.map(encode)) - pb.write3(3, rpc.response.map(encode)) + pb.write3(20, rpc.pubSubTopic) + pb.write3(21, rpc.message.encode()) pb.finish3() - pb + return pb -proc decode*(T: type PushRPC, buffer: seq[byte]): ProtobufResult[T] = +proc decode*(T: type LightpushRequest, buffer: seq[byte]): ProtobufResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PushRPC() + var rpc = LightpushRequest() var requestId: string if not ?pb.getField(1, requestId): @@ -79,18 +25,57 @@ proc decode*(T: type PushRPC, buffer: seq[byte]): ProtobufResult[T] = else: rpc.requestId = requestId - var requestBuffer: seq[byte] - if not ?pb.getField(2, requestBuffer): - rpc.request = none(PushRequest) + var pubSubTopic: PubsubTopic + if not ?pb.getField(20, pubSubTopic): + rpc.pubSubTopic = none(PubsubTopic) else: - let request = ?PushRequest.decode(requestBuffer) - rpc.request = some(request) + rpc.pubSubTopic = some(pubSubTopic) - var responseBuffer: seq[byte] - if not ?pb.getField(3, responseBuffer): - rpc.response = none(PushResponse) + var messageBuf: seq[byte] + if not ?pb.getField(21, messageBuf): + return err(ProtobufError.missingRequiredField("message")) else: - let response = ?PushResponse.decode(responseBuffer) - rpc.response = some(response) + rpc.message = ?WakuMessage.decode(messageBuf) - ok(rpc) + return ok(rpc) + +proc encode*(rpc: LightPushResponse): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.requestId) + pb.write3(10, rpc.statusCode) + pb.write3(11, rpc.statusDesc) + pb.write3(12, rpc.relayPeerCount) + pb.finish3() + + return pb + +proc decode*(T: type LightPushResponse, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = LightPushResponse() + + var requestId: string + if not ?pb.getField(1, requestId): + return err(ProtobufError.missingRequiredField("request_id")) + else: + rpc.requestId = requestId + + var statusCode: uint32 + if not ?pb.getField(10, statusCode): + return err(ProtobufError.missingRequiredField("status_code")) + else: + rpc.statusCode = statusCode + + var statusDesc: string + if not ?pb.getField(11, statusDesc): + rpc.statusDesc = none(string) + else: + rpc.statusDesc = some(statusDesc) + + var relayPeerCount: uint32 + if not ?pb.getField(12, relayPeerCount): + rpc.relayPeerCount = none(uint32) + else: + rpc.relayPeerCount = some(relayPeerCount) + + return ok(rpc) diff --git a/waku/waku_lightpush/self_req_handler.nim b/waku/waku_lightpush/self_req_handler.nim index 410d5808a..fffced40a 100644 --- a/waku/waku_lightpush/self_req_handler.nim +++ b/waku/waku_lightpush/self_req_handler.nim @@ -20,8 +20,8 @@ import ../utils/requests proc handleSelfLightPushRequest*( - self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage -): Future[WakuLightPushResult[string]] {.async.} = + self: WakuLightPush, pubSubTopic: Option[PubsubTopic], message: WakuMessage +): Future[WakuLightPushResult] {.async.} = ## Handles the lightpush requests made by the node to itself. ## Normally used in REST-lightpush requests ## On success, returns the msg_hash of the published message. @@ -30,30 +30,14 @@ proc handleSelfLightPushRequest*( # provide self peerId as now this node is used directly, thus there is no light client sender peer. let selfPeerId = self.peerManager.switch.peerInfo.peerId - let req = PushRequest(pubSubTopic: pubSubTopic, message: message) - let rpc = PushRPC(requestId: generateRequestId(self.rng), request: some(req)) + let req = LightpushRequest( + requestId: generateRequestId(self.rng), pubSubTopic: pubSubTopic, message: message + ) - let respRpc = await self.handleRequest(selfPeerId, rpc.encode().buffer) + let response = await self.handleRequest(selfPeerId, req.encode().buffer) - if respRpc.response.isNone(): - waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure]) - return err(emptyResponseBodyFailure) - - let response = respRpc.response.get() - if not response.isSuccess: - if response.info.isSome(): - return err(response.info.get()) - else: - return err("unknown failure") - - let msg_hash_hex_str = computeMessageHash(pubSubTopic, message).to0xHex() - - notice "publishing message with self hosted lightpush", - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - self_peer_id = selfPeerId, - msg_hash = msg_hash_hex_str - - return ok(msg_hash_hex_str) + return response.toPushResult() except Exception: - return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg()) + return lightPushResultInternalError( + "exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg() + ) diff --git a/waku/waku_lightpush_legacy.nim b/waku/waku_lightpush_legacy.nim new file mode 100644 index 000000000..f1b25cbbe --- /dev/null +++ b/waku/waku_lightpush_legacy.nim @@ -0,0 +1,5 @@ +import + ./waku_lightpush_legacy/ + [protocol, common, rpc, rpc_codec, callbacks, self_req_handler] + +export protocol, common, rpc, rpc_codec, callbacks, self_req_handler diff --git a/waku/waku_lightpush/README.md b/waku/waku_lightpush_legacy/README.md similarity index 100% rename from waku/waku_lightpush/README.md rename to waku/waku_lightpush_legacy/README.md diff --git a/waku/waku_lightpush_legacy/callbacks.nim b/waku/waku_lightpush_legacy/callbacks.nim new file mode 100644 index 000000000..f5a79eadc --- /dev/null +++ b/waku/waku_lightpush_legacy/callbacks.nim @@ -0,0 +1,62 @@ +{.push raises: [].} + +import + ../waku_core, + ../waku_relay, + ./common, + ./protocol_metrics, + ../waku_rln_relay, + ../waku_rln_relay/protocol_types + +import std/times, libp2p/peerid, stew/byteutils + +proc checkAndGenerateRLNProof*( + rlnPeer: Option[WakuRLNRelay], message: WakuMessage +): Result[WakuMessage, string] = + # check if the message already has RLN proof + if message.proof.len > 0: + return ok(message) + + if rlnPeer.isNone(): + notice "Publishing message without RLN proof" + return ok(message) + # generate and append RLN proof + let + time = getTime().toUnix() + senderEpochTime = float64(time) + var msgWithProof = message + rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime).isOkOr: + return err(error) + return ok(msgWithProof) + +proc getNilPushHandler*(): PushMessageHandler = + return proc( + peer: PeerId, pubsubTopic: string, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + return err("no waku relay found") + +proc getRelayPushHandler*( + wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() +): PushMessageHandler = + return proc( + peer: PeerId, pubsubTopic: string, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + # append RLN proof + let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message) + if msgWithProof.isErr(): + return err(msgWithProof.error) + + (await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr: + return err(error) + + let publishResult = await wakuRelay.publish(pubsubTopic, msgWithProof.value) + if publishResult.isErr(): + ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + notice "Lightpush request has not been published to any peers", + msg_hash = msgHash, reason = $publishResult.error + # for legacy lightpush we do not detail the reason towards clients. All error during publish result in not-published-to-any-peer + # this let client of the legacy protocol to react as they did so far. + return err(protocol_metrics.notPublishedAnyPeer) + + return ok() diff --git a/waku/waku_lightpush_legacy/client.nim b/waku/waku_lightpush_legacy/client.nim new file mode 100644 index 000000000..c3b4a158e --- /dev/null +++ b/waku/waku_lightpush_legacy/client.nim @@ -0,0 +1,111 @@ +{.push raises: [].} + +import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils +import libp2p/peerid +import + ../waku_core/peers, + ../node/peer_manager, + ../node/delivery_monitor/publish_observer, + ../utils/requests, + ../waku_core, + ./common, + ./protocol_metrics, + ./rpc, + ./rpc_codec + +logScope: + topics = "waku lightpush legacy client" + +type WakuLegacyLightPushClient* = ref object + peerManager*: PeerManager + rng*: ref rand.HmacDrbgContext + publishObservers: seq[PublishObserver] + +proc new*( + T: type WakuLegacyLightPushClient, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, +): T = + WakuLegacyLightPushClient(peerManager: peerManager, rng: rng) + +proc addPublishObserver*(wl: WakuLegacyLightPushClient, obs: PublishObserver) = + wl.publishObservers.add(obs) + +proc sendPushRequest( + wl: WakuLegacyLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo +): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + let connOpt = await wl.peerManager.dialPeer(peer, WakuLegacyLightPushCodec) + if connOpt.isNone(): + waku_lightpush_errors.inc(labelValues = [dialFailure]) + return err(dialFailure) + let connection = connOpt.get() + + let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req)) + await connection.writeLP(rpc.encode().buffer) + + var buffer: seq[byte] + try: + buffer = await connection.readLp(DefaultMaxRpcSize.int) + except LPStreamRemoteClosedError: + return err("Exception reading: " & getCurrentExceptionMsg()) + + let decodeRespRes = PushRPC.decode(buffer) + if decodeRespRes.isErr(): + error "failed to decode response" + waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) + return err(decodeRpcFailure) + + let pushResponseRes = decodeRespRes.get() + if pushResponseRes.response.isNone(): + waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure]) + return err(emptyResponseBodyFailure) + + let response = pushResponseRes.response.get() + if not response.isSuccess: + if response.info.isSome(): + return err(response.info.get()) + else: + return err("unknown failure") + + return ok() + +proc publish*( + wl: WakuLegacyLightPushClient, + pubSubTopic: PubsubTopic, + message: WakuMessage, + peer: RemotePeerInfo, +): Future[WakuLightPushResult[string]] {.async, gcsafe.} = + ## On success, returns the msg_hash of the published message + let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex() + let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) + ?await wl.sendPushRequest(pushRequest, peer) + + for obs in wl.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + notice "publishing message with lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msg_hash_hex_str + + return ok(msg_hash_hex_str) + +proc publishToAny*( + wl: WakuLegacyLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage +): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + ## This proc is similar to the publish one but in this case + ## we don't specify a particular peer and instead we get it from peer manager + + info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex + + let peer = wl.peerManager.selectPeer(WakuLegacyLightPushCodec).valueOr: + return err("could not retrieve a peer supporting WakuLegacyLightPushCodec") + + let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) + ?await wl.sendPushRequest(pushRequest, peer) + + for obs in wl.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + return ok() diff --git a/waku/waku_lightpush_legacy/common.nim b/waku/waku_lightpush_legacy/common.nim new file mode 100644 index 000000000..fcdf1814c --- /dev/null +++ b/waku/waku_lightpush_legacy/common.nim @@ -0,0 +1,15 @@ +{.push raises: [].} + +import results, chronos, libp2p/peerid +import ../waku_core + +from ../waku_core/codecs import WakuLegacyLightPushCodec +export WakuLegacyLightPushCodec + +type WakuLightPushResult*[T] = Result[T, string] + +type PushMessageHandler* = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage +): Future[WakuLightPushResult[void]] {.async.} + +const TooManyRequestsMessage* = "TOO_MANY_REQUESTS" diff --git a/waku/waku_lightpush_legacy/protocol.nim b/waku/waku_lightpush_legacy/protocol.nim new file mode 100644 index 000000000..feb6a1320 --- /dev/null +++ b/waku/waku_lightpush_legacy/protocol.nim @@ -0,0 +1,113 @@ +{.push raises: [].} + +import std/options, results, stew/byteutils, chronicles, chronos, metrics, bearssl/rand +import + ../node/peer_manager/peer_manager, + ../waku_core, + ./common, + ./rpc, + ./rpc_codec, + ./protocol_metrics, + ../common/rate_limit/request_limiter + +logScope: + topics = "waku lightpush legacy" + +type WakuLegacyLightPush* = ref object of LPProtocol + rng*: ref rand.HmacDrbgContext + peerManager*: PeerManager + pushHandler*: PushMessageHandler + requestRateLimiter*: RequestRateLimiter + +proc handleRequest*( + wl: WakuLegacyLightPush, peerId: PeerId, buffer: seq[byte] +): Future[PushRPC] {.async.} = + let reqDecodeRes = PushRPC.decode(buffer) + var + isSuccess = false + pushResponseInfo = "" + requestId = "" + + if reqDecodeRes.isErr(): + pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error + elif reqDecodeRes.get().request.isNone(): + pushResponseInfo = emptyRequestBodyFailure + else: + let pushRpcRequest = reqDecodeRes.get() + + requestId = pushRpcRequest.requestId + + let + request = pushRpcRequest.request + + pubSubTopic = request.get().pubSubTopic + message = request.get().message + waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + notice "handling lightpush request", + peer_id = peerId, + requestId = requestId, + pubsubTopic = pubsubTopic, + msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(), + receivedTime = getNowInNanosecondTime() + + let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) + isSuccess = handleRes.isOk() + pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) + + if not isSuccess: + waku_lightpush_errors.inc(labelValues = [pushResponseInfo]) + error "failed to push message", error = pushResponseInfo + let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo)) + let rpc = PushRPC(requestId: requestId, response: some(response)) + return rpc + +proc initProtocolHandler(wl: WakuLegacyLightPush) = + proc handle(conn: Connection, proto: string) {.async.} = + var rpc: PushRPC + wl.requestRateLimiter.checkUsageLimit(WakuLegacyLightPushCodec, conn): + let buffer = await conn.readLp(DefaultMaxRpcSize) + + waku_service_network_bytes.inc( + amount = buffer.len().int64, labelValues = [WakuLegacyLightPushCodec, "in"] + ) + + rpc = await handleRequest(wl, conn.peerId, buffer) + do: + debug "lightpush request rejected due rate limit exceeded", + peerId = conn.peerId, limit = $wl.requestRateLimiter.setting + + rpc = static( + PushRPC( + ## We will not copy and decode RPC buffer from stream only for requestId + ## in reject case as it is comparably too expensive and opens possible + ## attack surface + requestId: "N/A", + response: + some(PushResponse(isSuccess: false, info: some(TooManyRequestsMessage))), + ) + ) + + await conn.writeLp(rpc.encode().buffer) + + ## For lightpush might not worth to measure outgoing trafic as it is only + ## small respones about success/failure + + wl.handler = handle + wl.codec = WakuLegacyLightPushCodec + +proc new*( + T: type WakuLegacyLightPush, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + pushHandler: PushMessageHandler, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), +): T = + let wl = WakuLegacyLightPush( + rng: rng, + peerManager: peerManager, + pushHandler: pushHandler, + requestRateLimiter: newRequestRateLimiter(rateLimitSetting), + ) + wl.initProtocolHandler() + setServiceLimitMetric(WakuLegacyLightPushCodec, rateLimitSetting) + return wl diff --git a/waku/waku_lightpush_legacy/protocol_metrics.nim b/waku/waku_lightpush_legacy/protocol_metrics.nim new file mode 100644 index 000000000..ce48a7d3d --- /dev/null +++ b/waku/waku_lightpush_legacy/protocol_metrics.nim @@ -0,0 +1,19 @@ +{.push raises: [].} + +import metrics + +declarePublicGauge waku_lightpush_errors, + "number of lightpush protocol errors", ["type"] +declarePublicGauge waku_lightpush_messages, + "number of lightpush messages received", ["type"] + +# Error types (metric label values) +const + dialFailure* = "dial_failure" + decodeRpcFailure* = "decode_rpc_failure" + peerNotFoundFailure* = "peer_not_found_failure" + emptyRequestBodyFailure* = "empty_request_body_failure" + emptyResponseBodyFailure* = "empty_response_body_failure" + messagePushFailure* = "message_push_failure" + requestLimitReachedFailure* = "request_limit_reached_failure" + notPublishedAnyPeer* = "not_published_to_any_peer" diff --git a/waku/waku_lightpush_legacy/rpc.nim b/waku/waku_lightpush_legacy/rpc.nim new file mode 100644 index 000000000..33ba3f5e3 --- /dev/null +++ b/waku/waku_lightpush_legacy/rpc.nim @@ -0,0 +1,18 @@ +{.push raises: [].} + +import std/options +import ../waku_core + +type + PushRequest* = object + pubSubTopic*: string + message*: WakuMessage + + PushResponse* = object + isSuccess*: bool + info*: Option[string] + + PushRPC* = object + requestId*: string + request*: Option[PushRequest] + response*: Option[PushResponse] diff --git a/waku/waku_lightpush_legacy/rpc_codec.nim b/waku/waku_lightpush_legacy/rpc_codec.nim new file mode 100644 index 000000000..25d2bd210 --- /dev/null +++ b/waku/waku_lightpush_legacy/rpc_codec.nim @@ -0,0 +1,96 @@ +{.push raises: [].} + +import std/options +import ../common/protobuf, ../waku_core, ./rpc + +const DefaultMaxRpcSize* = -1 + +proc encode*(rpc: PushRequest): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.pubSubTopic) + pb.write3(2, rpc.message.encode()) + pb.finish3() + + pb + +proc decode*(T: type PushRequest, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = PushRequest() + + var pubSubTopic: PubsubTopic + if not ?pb.getField(1, pubSubTopic): + return err(ProtobufError.missingRequiredField("pubsub_topic")) + else: + rpc.pubSubTopic = pubSubTopic + + var messageBuf: seq[byte] + if not ?pb.getField(2, messageBuf): + return err(ProtobufError.missingRequiredField("message")) + else: + rpc.message = ?WakuMessage.decode(messageBuf) + + ok(rpc) + +proc encode*(rpc: PushResponse): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, uint64(rpc.isSuccess)) + pb.write3(2, rpc.info) + pb.finish3() + + pb + +proc decode*(T: type PushResponse, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = PushResponse() + + var isSuccess: uint64 + if not ?pb.getField(1, isSuccess): + return err(ProtobufError.missingRequiredField("is_success")) + else: + rpc.isSuccess = bool(isSuccess) + + var info: string + if not ?pb.getField(2, info): + rpc.info = none(string) + else: + rpc.info = some(info) + + ok(rpc) + +proc encode*(rpc: PushRPC): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.requestId) + pb.write3(2, rpc.request.map(encode)) + pb.write3(3, rpc.response.map(encode)) + pb.finish3() + + pb + +proc decode*(T: type PushRPC, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = PushRPC() + + var requestId: string + if not ?pb.getField(1, requestId): + return err(ProtobufError.missingRequiredField("request_id")) + else: + rpc.requestId = requestId + + var requestBuffer: seq[byte] + if not ?pb.getField(2, requestBuffer): + rpc.request = none(PushRequest) + else: + let request = ?PushRequest.decode(requestBuffer) + rpc.request = some(request) + + var responseBuffer: seq[byte] + if not ?pb.getField(3, responseBuffer): + rpc.response = none(PushResponse) + else: + let response = ?PushResponse.decode(responseBuffer) + rpc.response = some(response) + + ok(rpc) diff --git a/waku/waku_lightpush_legacy/self_req_handler.nim b/waku/waku_lightpush_legacy/self_req_handler.nim new file mode 100644 index 000000000..3c5d09a9c --- /dev/null +++ b/waku/waku_lightpush_legacy/self_req_handler.nim @@ -0,0 +1,59 @@ +{.push raises: [].} + +## Notice that the REST /lightpush requests normally assume that the node +## is acting as a lightpush-client that will trigger the service provider node +## to relay the message. +## In this module, we allow that a lightpush service node (full node) can be +## triggered directly through the REST /lightpush endpoint. +## The typical use case for that is when using `nwaku-compose`, +## which spawn a full service Waku node +## that could be used also as a lightpush client, helping testing and development. + +import results, chronos, chronicles, std/options, metrics, stew/byteutils +import + ../waku_core, + ./protocol, + ./common, + ./rpc, + ./rpc_codec, + ./protocol_metrics, + ../utils/requests + +proc handleSelfLightPushRequest*( + self: WakuLegacyLightPush, pubSubTopic: PubsubTopic, message: WakuMessage +): Future[WakuLightPushResult[string]] {.async.} = + ## Handles the lightpush requests made by the node to itself. + ## Normally used in REST-lightpush requests + ## On success, returns the msg_hash of the published message. + + try: + # provide self peerId as now this node is used directly, thus there is no light client sender peer. + let selfPeerId = self.peerManager.switch.peerInfo.peerId + + let req = PushRequest(pubSubTopic: pubSubTopic, message: message) + let rpc = PushRPC(requestId: generateRequestId(self.rng), request: some(req)) + + let respRpc = await self.handleRequest(selfPeerId, rpc.encode().buffer) + + if respRpc.response.isNone(): + waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure]) + return err(emptyResponseBodyFailure) + + let response = respRpc.response.get() + if not response.isSuccess: + if response.info.isSome(): + return err(response.info.get()) + else: + return err("unknown failure") + + let msg_hash_hex_str = computeMessageHash(pubSubTopic, message).to0xHex() + + notice "publishing message with self hosted lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + self_peer_id = selfPeerId, + msg_hash = msg_hash_hex_str + + return ok(msg_hash_hex_str) + except Exception: + return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg()) diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 080f12edf..0222db0d1 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -5,7 +5,7 @@ {.push raises: [].} import - std/strformat, + std/[strformat, strutils], stew/byteutils, results, sequtils, @@ -13,7 +13,6 @@ import chronicles, metrics, libp2p/multihash, - libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, @@ -136,6 +135,13 @@ type onTopicHealthChange*: TopicHealthChangeHandler topicHealthLoopHandle*: Future[void] +# predefinition for more detailed results from publishing new message +type PublishOutcome* {.pure.} = enum + NoTopicSpecified + DuplicateMessage + NoPeersToPublish + CannotGenerateMessageId + proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -514,7 +520,10 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage -): Future[int] {.async.} = +): Future[Result[int, PublishOutcome]] {.async.} = + if pubsubTopic.isEmptyOrWhitespace(): + return err(NoTopicSpecified) + let data = message.encode().buffer let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() @@ -522,11 +531,13 @@ proc publish*( let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data) - if relayedPeerCount > 0: - for obs in w.publishObservers: - obs.onMessagePublished(pubSubTopic, message) + if relayedPeerCount <= 0: + return err(NoPeersToPublish) - return relayedPeerCount + for obs in w.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + return ok(relayedPeerCount) proc getNumConnectedPeers*( w: WakuRelay, pubsubTopic: PubsubTopic