diff --git a/apps/liteprotocoltester/legacy_publisher.nim b/apps/liteprotocoltester/legacy_publisher.nim new file mode 100644 index 000000000..12733ad2d --- /dev/null +++ b/apps/liteprotocoltester/legacy_publisher.nim @@ -0,0 +1,24 @@ +import chronos, results, options +import waku/[waku_node, waku_core] +import publisher_base + +type LegacyPublisher* = ref object of PublisherBase + +proc new*(T: type LegacyPublisher, wakuNode: WakuNode): T = + if isNil(wakuNode.wakuLegacyLightpushClient): + wakuNode.mountLegacyLightPushClient() + + return LegacyPublisher(wakuNode: wakuNode) + +method send*( + self: LegacyPublisher, + topic: PubsubTopic, + message: WakuMessage, + servicePeer: RemotePeerInfo, +): Future[Result[void, string]] {.async.} = + # when error it must return original error desc due the text is used for distinction between error types in metrics. + discard ( + await self.wakuNode.legacyLightpushPublish(some(topic), message, servicePeer) + ).valueOr: + return err(error) + return ok() diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index 991e9ba78..598d1a7ec 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -28,8 +28,8 @@ import waku_core/multiaddrstr, ], ./tester_config, - ./lightpush_publisher, - ./filter_subscriber, + ./publisher, + ./receiver, ./diagnose_connections, ./service_peer_management @@ -69,13 +69,13 @@ when isMainModule: ## - override according to tester functionality ## - var wakuConf: WakuNodeConf + var wConf: WakuNodeConf if conf.configFile.isSome(): try: var configFile {.threadvar.}: InputFile configFile = conf.configFile.get() - wakuConf = WakuNodeConf.load( + wConf = WakuNodeConf.load( version = versionString, printUsage = false, secondarySources = proc( @@ -88,36 +88,36 @@ when isMainModule: error "Loading Waku configuration failed", error = getCurrentExceptionMsg() quit(QuitFailure) - wakuConf.logLevel = conf.logLevel - wakuConf.logFormat = conf.logFormat - wakuConf.nat = conf.nat - wakuConf.maxConnections = 500 - wakuConf.restAddress = conf.restAddress - wakuConf.restPort = conf.restPort - wakuConf.restAllowOrigin = conf.restAllowOrigin + wConf.logLevel = conf.logLevel + wConf.logFormat = conf.logFormat + wConf.nat = conf.nat + wConf.maxConnections = 500 + wConf.restAddress = conf.restAddress + wConf.restPort = conf.restPort + wConf.restAllowOrigin = conf.restAllowOrigin - wakuConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")] + wConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")] - wakuConf.shards = @[conf.shard] - wakuConf.contentTopics = conf.contentTopics - wakuConf.clusterId = conf.clusterId + wConf.shards = @[conf.shard] + wConf.contentTopics = conf.contentTopics + wConf.clusterId = conf.clusterId ## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc... - wakuConf.metricsServer = true - wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0") - wakuConf.metricsServerPort = conf.metricsPort + wConf.metricsServer = true + wConf.metricsServerAddress = parseIpAddress("0.0.0.0") + wConf.metricsServerPort = conf.metricsPort # If bootstrap option is chosen we expect our clients will not mounted # so we will mount PeerExchange manually to gather possible service peers, # if got some we will mount the client protocols afterward. - wakuConf.peerExchange = false - wakuConf.relay = false - wakuConf.filter = false - wakuConf.lightpush = false - wakuConf.store = false + wConf.peerExchange = false + wConf.relay = false + wConf.filter = false + wConf.lightpush = false + wConf.store = false - wakuConf.rest = false - wakuConf.relayServiceRatio = "40:60" + wConf.rest = false + wConf.relayServiceRatio = "40:60" # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it # It will always be called from main thread anyway. @@ -126,12 +126,20 @@ when isMainModule: nodeHealthMonitor = WakuNodeHealthMonitor() nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING) - let restServer = rest_server_builder.startRestServerEssentials( - nodeHealthMonitor, wakuConf - ).valueOr: - error "Starting esential REST server failed.", error = $error + let wakuConf = wConf.toWakuConf().valueOr: + error "Waku configuration failed", error = error quit(QuitFailure) + let restServer: WakuRestServerRef = + if wakuConf.restServerConf.isSome(): + rest_server_builder.startRestServerEssentials( + nodeHealthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift + ).valueOr: + error "Starting essential REST server failed.", error = $error + quit(QuitFailure) + else: + nil + var wakuApp = Waku.new(wakuConf).valueOr: error "Waku initialization failed", error = error quit(QuitFailure) @@ -144,15 +152,27 @@ when isMainModule: error "Starting waku failed", error = error quit(QuitFailure) - rest_server_builder.startRestServerProtocolSupport( - restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf - ).isOkOr: - error "Starting protocols support REST server failed.", error = $error - quit(QuitFailure) + if wakuConf.restServerConf.isSome(): + rest_server_builder.startRestServerProtocolSupport( + restServer, + wakuApp.node, + wakuApp.wakuDiscv5, + wakuConf.restServerConf.get(), + wakuConf.relay, + wakuConf.lightPush, + wakuConf.clusterId, + wakuConf.shards, + wakuConf.contentTopics, + ).isOkOr: + error "Starting protocols support REST server failed.", error = $error + quit(QuitFailure) - wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(wakuConf).valueOr: - error "Starting monitoring and external interfaces failed", error = error - quit(QuitFailure) + if wakuConf.metricsServerConf.isSome(): + wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging( + wakuConf.metricsServerConf.get(), wakuConf.portsShift + ).valueOr: + error "Starting monitoring and external interfaces failed", error = error + quit(QuitFailure) nodeHealthMonitor.setOverallHealth(HealthStatus.READY) @@ -199,12 +219,8 @@ when isMainModule: info "Node setup complete" - var codec = WakuLightPushCodec + let codec = conf.getCodec() # mounting relevant client, for PX filter client must be mounted ahead - if conf.testFunc == TesterFunctionality.SENDER: - codec = WakuLightPushCodec - else: - codec = WakuFilterSubscribeCodec var lookForServiceNode = false var serviceNodePeerInfo: RemotePeerInfo @@ -241,6 +257,6 @@ when isMainModule: if conf.testFunc == TesterFunctionality.SENDER: setupAndPublish(wakuApp.node, conf, serviceNodePeerInfo) else: - setupAndSubscribe(wakuApp.node, conf, serviceNodePeerInfo) + setupAndListen(wakuApp.node, conf, serviceNodePeerInfo) runForever() diff --git a/apps/liteprotocoltester/lpt_supervisor.py b/apps/liteprotocoltester/lpt_supervisor.py index 24c395b0a..7d882afd2 100755 --- a/apps/liteprotocoltester/lpt_supervisor.py +++ b/apps/liteprotocoltester/lpt_supervisor.py @@ -24,8 +24,8 @@ def run_tester_node(predefined_test_env): return os.system(script_cmd) if __name__ == "__main__": - if len(sys.argv) < 2 or sys.argv[1] not in ["RECEIVER", "SENDER"]: - print("Error: First argument must be either 'RECEIVER' or 'SENDER'") + if len(sys.argv) < 2 or sys.argv[1] not in ["RECEIVER", "SENDER", "SENDERV3"]: + print("Error: First argument must be either 'RECEIVER' or 'SENDER' or 'SENDERV3'") sys.exit(1) predefined_test_env_file = '/usr/bin/infra.env' diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/publisher.nim similarity index 92% rename from apps/liteprotocoltester/lightpush_publisher.nim rename to apps/liteprotocoltester/publisher.nim index d79e68590..d8031473d 100644 --- a/apps/liteprotocoltester/lightpush_publisher.nim +++ b/apps/liteprotocoltester/publisher.nim @@ -21,14 +21,17 @@ import ./tester_message, ./lpt_metrics, ./diagnose_connections, - ./service_peer_management + ./service_peer_management, + ./publisher_base, + ./legacy_publisher, + ./v3_publisher randomize() type SizeRange* = tuple[min: uint64, max: uint64] -var RANDOM_PALYLOAD {.threadvar.}: seq[byte] -RANDOM_PALYLOAD = urandom(1024 * 1024) +var RANDOM_PAYLOAD {.threadvar.}: seq[byte] +RANDOM_PAYLOAD = urandom(1024 * 1024) # 1MiB of random payload to be used to extend message proc prepareMessage( @@ -59,9 +62,8 @@ proc prepareMessage( if renderSize < len(contentPayload).uint64: renderSize = len(contentPayload).uint64 - let finalPayload = concat( - contentPayload, RANDOM_PALYLOAD[0 .. renderSize - len(contentPayload).uint64] - ) + let finalPayload = + concat(contentPayload, RANDOM_PAYLOAD[0 .. renderSize - len(contentPayload).uint64]) let message = WakuMessage( payload: finalPayload, # content of the message contentTopic: contentTopic, # content topic to publish to @@ -108,6 +110,7 @@ proc reportSentMessages() = proc publishMessages( wakuNode: WakuNode, + publisher: PublisherBase, servicePeer: RemotePeerInfo, lightpushPubsubTopic: PubsubTopic, lightpushContentTopic: ContentTopic, @@ -148,9 +151,7 @@ proc publishMessages( let publishStartTime = Moment.now() - let wlpRes = await wakuNode.legacyLightpushPublish( - some(lightpushPubsubTopic), message, actualServicePeer - ) + let wlpRes = await publisher.send(lightpushPubsubTopic, message, actualServicePeer) let publishDuration = Moment.now() - publishStartTime @@ -213,10 +214,13 @@ proc publishMessages( proc setupAndPublish*( wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo ) = - if isNil(wakuNode.wakuLightpushClient): - # if we have not yet initialized lightpush client, then do it as the only way we can get here is - # by having a service peer discovered. - wakuNode.mountLegacyLightPushClient() + var publisher: PublisherBase + if conf.lightpushVersion == LightpushVersion.LEGACY: + info "Using legacy lightpush protocol for publishing messages" + publisher = LegacyPublisher.new(wakuNode) + else: + info "Using lightpush v3 protocol for publishing messages" + publisher = V3Publisher.new(wakuNode) # give some time to receiver side to set up let waitTillStartTesting = conf.startPublishingAfter.seconds @@ -257,6 +261,7 @@ proc setupAndPublish*( # Start maintaining subscription asyncSpawn publishMessages( wakuNode, + publisher, servicePeer, conf.getPubsubTopic(), conf.contentTopics[0], diff --git a/apps/liteprotocoltester/publisher_base.nim b/apps/liteprotocoltester/publisher_base.nim new file mode 100644 index 000000000..de88d82f8 --- /dev/null +++ b/apps/liteprotocoltester/publisher_base.nim @@ -0,0 +1,14 @@ +import chronos, results +import waku/[waku_node, waku_core] + +type PublisherBase* = ref object of RootObj + wakuNode*: WakuNode + +method send*( + self: PublisherBase, + topic: PubsubTopic, + message: WakuMessage, + servicePeer: RemotePeerInfo, +): Future[Result[void, string]] {.base, async.} = + discard + # when error it must return original error desc due the text is used for distinction between error types in metrics. diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/receiver.nim similarity index 99% rename from apps/liteprotocoltester/filter_subscriber.nim rename to apps/liteprotocoltester/receiver.nim index fbb11c92e..f0f41b1c5 100644 --- a/apps/liteprotocoltester/filter_subscriber.nim +++ b/apps/liteprotocoltester/receiver.nim @@ -116,7 +116,7 @@ proc maintainSubscription( await sleepAsync(30.seconds) # Subscription maintenance interval -proc setupAndSubscribe*( +proc setupAndListen*( wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo ) = if isNil(wakuNode.wakuFilterClient): diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh index 4a80ca460..3c2d60e2f 100755 --- a/apps/liteprotocoltester/run_tester_node.sh +++ b/apps/liteprotocoltester/run_tester_node.sh @@ -25,7 +25,12 @@ fi FUNCTION=$2 if [ "${FUNCTION}" = "SENDER" ]; then - FUNCTION=--test-func=SENDER + FUNCTION="--test-func=SENDER --lightpush-version=LEGACY" + SERVICENAME=lightpush-service +fi + +if [ "${FUNCTION}" = "SENDERV3" ]; then + FUNCTION="--test-func=SENDER --lightpush-version=V3" SERVICENAME=lightpush-service fi diff --git a/apps/liteprotocoltester/run_tester_node_at_infra.sh b/apps/liteprotocoltester/run_tester_node_at_infra.sh index e926875aa..db26eb091 100644 --- a/apps/liteprotocoltester/run_tester_node_at_infra.sh +++ b/apps/liteprotocoltester/run_tester_node_at_infra.sh @@ -26,7 +26,15 @@ fi FUNCTION=$2 if [ "${FUNCTION}" = "SENDER" ]; then - FUNCTION=--test-func=SENDER + FUNCTION="--test-func=SENDER --lightpush-version=LEGACY" + SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}} + NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"} + NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"} + METRICS_PORT=--metrics-port="${PUBLISHER_METRICS_PORT:-8003}" +fi + +if [ "${FUNCTION}" = "SENDERV3" ]; then + FUNCTION="--test-func=SENDER --lightpush-version=V3" SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}} NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"} NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"} diff --git a/apps/liteprotocoltester/run_tester_node_on_fleet.sh b/apps/liteprotocoltester/run_tester_node_on_fleet.sh index 538a890e6..533f5b1bf 100644 --- a/apps/liteprotocoltester/run_tester_node_on_fleet.sh +++ b/apps/liteprotocoltester/run_tester_node_on_fleet.sh @@ -26,7 +26,15 @@ fi FUNCTION=$2 if [ "${FUNCTION}" = "SENDER" ]; then - FUNCTION=--test-func=SENDER + FUNCTION="--test-func=SENDER --lightpush-version=LEGACY" + SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}} + NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"} + NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"} + METRICS_PORT=--metrics-port="${PUBLISHER_METRICS_PORT:-8003}" +fi + +if [ "${FUNCTION}" = "SENDERV3" ]; then + FUNCTION="--test-func=SENDER --lightpush-version=V3" SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}} NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"} NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"} diff --git a/apps/liteprotocoltester/service_peer_management.nim b/apps/liteprotocoltester/service_peer_management.nim index a303c3c58..7d79e0f36 100644 --- a/apps/liteprotocoltester/service_peer_management.nim +++ b/apps/liteprotocoltester/service_peer_management.nim @@ -158,9 +158,7 @@ proc tryCallAllPxPeers*( proc pxLookupServiceNode*( node: WakuNode, conf: LiteProtocolTesterConf ): Future[Result[bool, void]] {.async.} = - var codec: string = WakuLightPushCodec - if conf.testFunc == TesterFunctionality.RECEIVER: - codec = WakuFilterSubscribeCodec + let codec: string = conf.getCodec() if node.wakuPeerExchange.isNil(): let peerExchangeNode = translateToRemotePeerInfo(conf.bootstrapNode).valueOr: diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index eccaafc06..c06a970b1 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -33,6 +33,10 @@ type TesterFunctionality* = enum SENDER # pumps messages to the network RECEIVER # gather and analyze messages from the network +type LightpushVersion* = enum + LEGACY # legacy lightpush protocol + V3 # lightpush v3 protocol + type LiteProtocolTesterConf* = object configFile* {. desc: @@ -80,6 +84,12 @@ type LiteProtocolTesterConf* = object name: "test-func" .}: TesterFunctionality + lightpushVersion* {. + desc: "Version of the sender to use. Supported values: legacy, v3.", + defaultValue: LightpushVersion.LEGACY, + name: "lightpush-version" + .}: LightpushVersion + numMessages* {. desc: "Number of messages to send.", defaultValue: 120, name: "num-messages" .}: uint32 @@ -190,4 +200,14 @@ proc load*(T: type LiteProtocolTesterConf, version = ""): ConfResult[T] = proc getPubsubTopic*(conf: LiteProtocolTesterConf): PubsubTopic = return $RelayShard(clusterId: conf.clusterId, shardId: conf.shard) +proc getCodec*(conf: LiteProtocolTesterConf): string = + return + if conf.testFunc == TesterFunctionality.RECEIVER: + WakuFilterSubscribeCodec + else: + if conf.lightpushVersion == LightpushVersion.LEGACY: + WakuLegacyLightPushCodec + else: + WakuLightPushCodec + {.pop.} diff --git a/apps/liteprotocoltester/v3_publisher.nim b/apps/liteprotocoltester/v3_publisher.nim new file mode 100644 index 000000000..74a3fdd05 --- /dev/null +++ b/apps/liteprotocoltester/v3_publisher.nim @@ -0,0 +1,29 @@ +import results, options, chronos +import waku/[waku_node, waku_core, waku_lightpush] +import publisher_base + +type V3Publisher* = ref object of PublisherBase + +proc new*(T: type V3Publisher, wakuNode: WakuNode): T = + if isNil(wakuNode.wakuLightpushClient): + wakuNode.mountLightPushClient() + + return V3Publisher(wakuNode: wakuNode) + +method send*( + self: V3Publisher, + topic: PubsubTopic, + message: WakuMessage, + servicePeer: RemotePeerInfo, +): Future[Result[void, string]] {.async.} = + # when error it must return original error desc due the text is used for distinction between error types in metrics. + discard ( + await self.wakuNode.lightpushPublish(some(topic), message, some(servicePeer)) + ).valueOr: + if error.code == NO_PEERS_TO_RELAY and + error.desc != some("No peers for topic, skipping publish"): + # TODO: We need better separation of errors happening on the client side or the server side.- + return err("dial_failure") + else: + return err($error.code) + return ok()