mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 22:43:09 +00:00
feat: lighptush v3 for lite-protocol-tester (#3455)
* Upgrade lpt to new config methods * Make choice of legacy and v3 lightpush configurable on cli * Adjust runner script to allow easy lightpush version selection * Prepare selectable lightpush for infra env runs * Fix misused result vs return * Fixes and more explanatory comments added * Fix ~pure virtual~ notice to =discard
This commit is contained in:
parent
b998430d52
commit
3f3c594885
24
apps/liteprotocoltester/legacy_publisher.nim
Normal file
24
apps/liteprotocoltester/legacy_publisher.nim
Normal file
@ -0,0 +1,24 @@
|
||||
import chronos, results, options
|
||||
import waku/[waku_node, waku_core]
|
||||
import publisher_base
|
||||
|
||||
type LegacyPublisher* = ref object of PublisherBase
|
||||
|
||||
proc new*(T: type LegacyPublisher, wakuNode: WakuNode): T =
|
||||
if isNil(wakuNode.wakuLegacyLightpushClient):
|
||||
wakuNode.mountLegacyLightPushClient()
|
||||
|
||||
return LegacyPublisher(wakuNode: wakuNode)
|
||||
|
||||
method send*(
|
||||
self: LegacyPublisher,
|
||||
topic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
servicePeer: RemotePeerInfo,
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
# when error it must return original error desc due the text is used for distinction between error types in metrics.
|
||||
discard (
|
||||
await self.wakuNode.legacyLightpushPublish(some(topic), message, servicePeer)
|
||||
).valueOr:
|
||||
return err(error)
|
||||
return ok()
|
||||
@ -28,8 +28,8 @@ import
|
||||
waku_core/multiaddrstr,
|
||||
],
|
||||
./tester_config,
|
||||
./lightpush_publisher,
|
||||
./filter_subscriber,
|
||||
./publisher,
|
||||
./receiver,
|
||||
./diagnose_connections,
|
||||
./service_peer_management
|
||||
|
||||
@ -69,13 +69,13 @@ when isMainModule:
|
||||
## - override according to tester functionality
|
||||
##
|
||||
|
||||
var wakuConf: WakuNodeConf
|
||||
var wConf: WakuNodeConf
|
||||
|
||||
if conf.configFile.isSome():
|
||||
try:
|
||||
var configFile {.threadvar.}: InputFile
|
||||
configFile = conf.configFile.get()
|
||||
wakuConf = WakuNodeConf.load(
|
||||
wConf = WakuNodeConf.load(
|
||||
version = versionString,
|
||||
printUsage = false,
|
||||
secondarySources = proc(
|
||||
@ -88,36 +88,36 @@ when isMainModule:
|
||||
error "Loading Waku configuration failed", error = getCurrentExceptionMsg()
|
||||
quit(QuitFailure)
|
||||
|
||||
wakuConf.logLevel = conf.logLevel
|
||||
wakuConf.logFormat = conf.logFormat
|
||||
wakuConf.nat = conf.nat
|
||||
wakuConf.maxConnections = 500
|
||||
wakuConf.restAddress = conf.restAddress
|
||||
wakuConf.restPort = conf.restPort
|
||||
wakuConf.restAllowOrigin = conf.restAllowOrigin
|
||||
wConf.logLevel = conf.logLevel
|
||||
wConf.logFormat = conf.logFormat
|
||||
wConf.nat = conf.nat
|
||||
wConf.maxConnections = 500
|
||||
wConf.restAddress = conf.restAddress
|
||||
wConf.restPort = conf.restPort
|
||||
wConf.restAllowOrigin = conf.restAllowOrigin
|
||||
|
||||
wakuConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")]
|
||||
wConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")]
|
||||
|
||||
wakuConf.shards = @[conf.shard]
|
||||
wakuConf.contentTopics = conf.contentTopics
|
||||
wakuConf.clusterId = conf.clusterId
|
||||
wConf.shards = @[conf.shard]
|
||||
wConf.contentTopics = conf.contentTopics
|
||||
wConf.clusterId = conf.clusterId
|
||||
## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc...
|
||||
|
||||
wakuConf.metricsServer = true
|
||||
wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0")
|
||||
wakuConf.metricsServerPort = conf.metricsPort
|
||||
wConf.metricsServer = true
|
||||
wConf.metricsServerAddress = parseIpAddress("0.0.0.0")
|
||||
wConf.metricsServerPort = conf.metricsPort
|
||||
|
||||
# If bootstrap option is chosen we expect our clients will not mounted
|
||||
# so we will mount PeerExchange manually to gather possible service peers,
|
||||
# if got some we will mount the client protocols afterward.
|
||||
wakuConf.peerExchange = false
|
||||
wakuConf.relay = false
|
||||
wakuConf.filter = false
|
||||
wakuConf.lightpush = false
|
||||
wakuConf.store = false
|
||||
wConf.peerExchange = false
|
||||
wConf.relay = false
|
||||
wConf.filter = false
|
||||
wConf.lightpush = false
|
||||
wConf.store = false
|
||||
|
||||
wakuConf.rest = false
|
||||
wakuConf.relayServiceRatio = "40:60"
|
||||
wConf.rest = false
|
||||
wConf.relayServiceRatio = "40:60"
|
||||
|
||||
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||
# It will always be called from main thread anyway.
|
||||
@ -126,12 +126,20 @@ when isMainModule:
|
||||
nodeHealthMonitor = WakuNodeHealthMonitor()
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
|
||||
|
||||
let restServer = rest_server_builder.startRestServerEssentials(
|
||||
nodeHealthMonitor, wakuConf
|
||||
).valueOr:
|
||||
error "Starting esential REST server failed.", error = $error
|
||||
let wakuConf = wConf.toWakuConf().valueOr:
|
||||
error "Waku configuration failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let restServer: WakuRestServerRef =
|
||||
if wakuConf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerEssentials(
|
||||
nodeHealthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift
|
||||
).valueOr:
|
||||
error "Starting essential REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
nil
|
||||
|
||||
var wakuApp = Waku.new(wakuConf).valueOr:
|
||||
error "Waku initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
@ -144,15 +152,27 @@ when isMainModule:
|
||||
error "Starting waku failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf
|
||||
).isOkOr:
|
||||
error "Starting protocols support REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
if wakuConf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
restServer,
|
||||
wakuApp.node,
|
||||
wakuApp.wakuDiscv5,
|
||||
wakuConf.restServerConf.get(),
|
||||
wakuConf.relay,
|
||||
wakuConf.lightPush,
|
||||
wakuConf.clusterId,
|
||||
wakuConf.shards,
|
||||
wakuConf.contentTopics,
|
||||
).isOkOr:
|
||||
error "Starting protocols support REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(wakuConf).valueOr:
|
||||
error "Starting monitoring and external interfaces failed", error = error
|
||||
quit(QuitFailure)
|
||||
if wakuConf.metricsServerConf.isSome():
|
||||
wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(
|
||||
wakuConf.metricsServerConf.get(), wakuConf.portsShift
|
||||
).valueOr:
|
||||
error "Starting monitoring and external interfaces failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
|
||||
@ -199,12 +219,8 @@ when isMainModule:
|
||||
|
||||
info "Node setup complete"
|
||||
|
||||
var codec = WakuLightPushCodec
|
||||
let codec = conf.getCodec()
|
||||
# mounting relevant client, for PX filter client must be mounted ahead
|
||||
if conf.testFunc == TesterFunctionality.SENDER:
|
||||
codec = WakuLightPushCodec
|
||||
else:
|
||||
codec = WakuFilterSubscribeCodec
|
||||
|
||||
var lookForServiceNode = false
|
||||
var serviceNodePeerInfo: RemotePeerInfo
|
||||
@ -241,6 +257,6 @@ when isMainModule:
|
||||
if conf.testFunc == TesterFunctionality.SENDER:
|
||||
setupAndPublish(wakuApp.node, conf, serviceNodePeerInfo)
|
||||
else:
|
||||
setupAndSubscribe(wakuApp.node, conf, serviceNodePeerInfo)
|
||||
setupAndListen(wakuApp.node, conf, serviceNodePeerInfo)
|
||||
|
||||
runForever()
|
||||
|
||||
@ -24,8 +24,8 @@ def run_tester_node(predefined_test_env):
|
||||
return os.system(script_cmd)
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2 or sys.argv[1] not in ["RECEIVER", "SENDER"]:
|
||||
print("Error: First argument must be either 'RECEIVER' or 'SENDER'")
|
||||
if len(sys.argv) < 2 or sys.argv[1] not in ["RECEIVER", "SENDER", "SENDERV3"]:
|
||||
print("Error: First argument must be either 'RECEIVER' or 'SENDER' or 'SENDERV3'")
|
||||
sys.exit(1)
|
||||
|
||||
predefined_test_env_file = '/usr/bin/infra.env'
|
||||
|
||||
@ -21,14 +21,17 @@ import
|
||||
./tester_message,
|
||||
./lpt_metrics,
|
||||
./diagnose_connections,
|
||||
./service_peer_management
|
||||
./service_peer_management,
|
||||
./publisher_base,
|
||||
./legacy_publisher,
|
||||
./v3_publisher
|
||||
|
||||
randomize()
|
||||
|
||||
type SizeRange* = tuple[min: uint64, max: uint64]
|
||||
|
||||
var RANDOM_PALYLOAD {.threadvar.}: seq[byte]
|
||||
RANDOM_PALYLOAD = urandom(1024 * 1024)
|
||||
var RANDOM_PAYLOAD {.threadvar.}: seq[byte]
|
||||
RANDOM_PAYLOAD = urandom(1024 * 1024)
|
||||
# 1MiB of random payload to be used to extend message
|
||||
|
||||
proc prepareMessage(
|
||||
@ -59,9 +62,8 @@ proc prepareMessage(
|
||||
if renderSize < len(contentPayload).uint64:
|
||||
renderSize = len(contentPayload).uint64
|
||||
|
||||
let finalPayload = concat(
|
||||
contentPayload, RANDOM_PALYLOAD[0 .. renderSize - len(contentPayload).uint64]
|
||||
)
|
||||
let finalPayload =
|
||||
concat(contentPayload, RANDOM_PAYLOAD[0 .. renderSize - len(contentPayload).uint64])
|
||||
let message = WakuMessage(
|
||||
payload: finalPayload, # content of the message
|
||||
contentTopic: contentTopic, # content topic to publish to
|
||||
@ -108,6 +110,7 @@ proc reportSentMessages() =
|
||||
|
||||
proc publishMessages(
|
||||
wakuNode: WakuNode,
|
||||
publisher: PublisherBase,
|
||||
servicePeer: RemotePeerInfo,
|
||||
lightpushPubsubTopic: PubsubTopic,
|
||||
lightpushContentTopic: ContentTopic,
|
||||
@ -148,9 +151,7 @@ proc publishMessages(
|
||||
|
||||
let publishStartTime = Moment.now()
|
||||
|
||||
let wlpRes = await wakuNode.legacyLightpushPublish(
|
||||
some(lightpushPubsubTopic), message, actualServicePeer
|
||||
)
|
||||
let wlpRes = await publisher.send(lightpushPubsubTopic, message, actualServicePeer)
|
||||
|
||||
let publishDuration = Moment.now() - publishStartTime
|
||||
|
||||
@ -213,10 +214,13 @@ proc publishMessages(
|
||||
proc setupAndPublish*(
|
||||
wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo
|
||||
) =
|
||||
if isNil(wakuNode.wakuLightpushClient):
|
||||
# if we have not yet initialized lightpush client, then do it as the only way we can get here is
|
||||
# by having a service peer discovered.
|
||||
wakuNode.mountLegacyLightPushClient()
|
||||
var publisher: PublisherBase
|
||||
if conf.lightpushVersion == LightpushVersion.LEGACY:
|
||||
info "Using legacy lightpush protocol for publishing messages"
|
||||
publisher = LegacyPublisher.new(wakuNode)
|
||||
else:
|
||||
info "Using lightpush v3 protocol for publishing messages"
|
||||
publisher = V3Publisher.new(wakuNode)
|
||||
|
||||
# give some time to receiver side to set up
|
||||
let waitTillStartTesting = conf.startPublishingAfter.seconds
|
||||
@ -257,6 +261,7 @@ proc setupAndPublish*(
|
||||
# Start maintaining subscription
|
||||
asyncSpawn publishMessages(
|
||||
wakuNode,
|
||||
publisher,
|
||||
servicePeer,
|
||||
conf.getPubsubTopic(),
|
||||
conf.contentTopics[0],
|
||||
14
apps/liteprotocoltester/publisher_base.nim
Normal file
14
apps/liteprotocoltester/publisher_base.nim
Normal file
@ -0,0 +1,14 @@
|
||||
import chronos, results
|
||||
import waku/[waku_node, waku_core]
|
||||
|
||||
type PublisherBase* = ref object of RootObj
|
||||
wakuNode*: WakuNode
|
||||
|
||||
method send*(
|
||||
self: PublisherBase,
|
||||
topic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
servicePeer: RemotePeerInfo,
|
||||
): Future[Result[void, string]] {.base, async.} =
|
||||
discard
|
||||
# when error it must return original error desc due the text is used for distinction between error types in metrics.
|
||||
@ -116,7 +116,7 @@ proc maintainSubscription(
|
||||
|
||||
await sleepAsync(30.seconds) # Subscription maintenance interval
|
||||
|
||||
proc setupAndSubscribe*(
|
||||
proc setupAndListen*(
|
||||
wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo
|
||||
) =
|
||||
if isNil(wakuNode.wakuFilterClient):
|
||||
@ -25,7 +25,12 @@ fi
|
||||
|
||||
FUNCTION=$2
|
||||
if [ "${FUNCTION}" = "SENDER" ]; then
|
||||
FUNCTION=--test-func=SENDER
|
||||
FUNCTION="--test-func=SENDER --lightpush-version=LEGACY"
|
||||
SERVICENAME=lightpush-service
|
||||
fi
|
||||
|
||||
if [ "${FUNCTION}" = "SENDERV3" ]; then
|
||||
FUNCTION="--test-func=SENDER --lightpush-version=V3"
|
||||
SERVICENAME=lightpush-service
|
||||
fi
|
||||
|
||||
|
||||
@ -26,7 +26,15 @@ fi
|
||||
|
||||
FUNCTION=$2
|
||||
if [ "${FUNCTION}" = "SENDER" ]; then
|
||||
FUNCTION=--test-func=SENDER
|
||||
FUNCTION="--test-func=SENDER --lightpush-version=LEGACY"
|
||||
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
|
||||
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
|
||||
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}
|
||||
METRICS_PORT=--metrics-port="${PUBLISHER_METRICS_PORT:-8003}"
|
||||
fi
|
||||
|
||||
if [ "${FUNCTION}" = "SENDERV3" ]; then
|
||||
FUNCTION="--test-func=SENDER --lightpush-version=V3"
|
||||
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
|
||||
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
|
||||
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}
|
||||
|
||||
@ -26,7 +26,15 @@ fi
|
||||
|
||||
FUNCTION=$2
|
||||
if [ "${FUNCTION}" = "SENDER" ]; then
|
||||
FUNCTION=--test-func=SENDER
|
||||
FUNCTION="--test-func=SENDER --lightpush-version=LEGACY"
|
||||
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
|
||||
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
|
||||
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}
|
||||
METRICS_PORT=--metrics-port="${PUBLISHER_METRICS_PORT:-8003}"
|
||||
fi
|
||||
|
||||
if [ "${FUNCTION}" = "SENDERV3" ]; then
|
||||
FUNCTION="--test-func=SENDER --lightpush-version=V3"
|
||||
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
|
||||
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
|
||||
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}
|
||||
|
||||
@ -158,9 +158,7 @@ proc tryCallAllPxPeers*(
|
||||
proc pxLookupServiceNode*(
|
||||
node: WakuNode, conf: LiteProtocolTesterConf
|
||||
): Future[Result[bool, void]] {.async.} =
|
||||
var codec: string = WakuLightPushCodec
|
||||
if conf.testFunc == TesterFunctionality.RECEIVER:
|
||||
codec = WakuFilterSubscribeCodec
|
||||
let codec: string = conf.getCodec()
|
||||
|
||||
if node.wakuPeerExchange.isNil():
|
||||
let peerExchangeNode = translateToRemotePeerInfo(conf.bootstrapNode).valueOr:
|
||||
|
||||
@ -33,6 +33,10 @@ type TesterFunctionality* = enum
|
||||
SENDER # pumps messages to the network
|
||||
RECEIVER # gather and analyze messages from the network
|
||||
|
||||
type LightpushVersion* = enum
|
||||
LEGACY # legacy lightpush protocol
|
||||
V3 # lightpush v3 protocol
|
||||
|
||||
type LiteProtocolTesterConf* = object
|
||||
configFile* {.
|
||||
desc:
|
||||
@ -80,6 +84,12 @@ type LiteProtocolTesterConf* = object
|
||||
name: "test-func"
|
||||
.}: TesterFunctionality
|
||||
|
||||
lightpushVersion* {.
|
||||
desc: "Version of the sender to use. Supported values: legacy, v3.",
|
||||
defaultValue: LightpushVersion.LEGACY,
|
||||
name: "lightpush-version"
|
||||
.}: LightpushVersion
|
||||
|
||||
numMessages* {.
|
||||
desc: "Number of messages to send.", defaultValue: 120, name: "num-messages"
|
||||
.}: uint32
|
||||
@ -190,4 +200,14 @@ proc load*(T: type LiteProtocolTesterConf, version = ""): ConfResult[T] =
|
||||
proc getPubsubTopic*(conf: LiteProtocolTesterConf): PubsubTopic =
|
||||
return $RelayShard(clusterId: conf.clusterId, shardId: conf.shard)
|
||||
|
||||
proc getCodec*(conf: LiteProtocolTesterConf): string =
|
||||
return
|
||||
if conf.testFunc == TesterFunctionality.RECEIVER:
|
||||
WakuFilterSubscribeCodec
|
||||
else:
|
||||
if conf.lightpushVersion == LightpushVersion.LEGACY:
|
||||
WakuLegacyLightPushCodec
|
||||
else:
|
||||
WakuLightPushCodec
|
||||
|
||||
{.pop.}
|
||||
|
||||
29
apps/liteprotocoltester/v3_publisher.nim
Normal file
29
apps/liteprotocoltester/v3_publisher.nim
Normal file
@ -0,0 +1,29 @@
|
||||
import results, options, chronos
|
||||
import waku/[waku_node, waku_core, waku_lightpush]
|
||||
import publisher_base
|
||||
|
||||
type V3Publisher* = ref object of PublisherBase
|
||||
|
||||
proc new*(T: type V3Publisher, wakuNode: WakuNode): T =
|
||||
if isNil(wakuNode.wakuLightpushClient):
|
||||
wakuNode.mountLightPushClient()
|
||||
|
||||
return V3Publisher(wakuNode: wakuNode)
|
||||
|
||||
method send*(
|
||||
self: V3Publisher,
|
||||
topic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
servicePeer: RemotePeerInfo,
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
# when error it must return original error desc due the text is used for distinction between error types in metrics.
|
||||
discard (
|
||||
await self.wakuNode.lightpushPublish(some(topic), message, some(servicePeer))
|
||||
).valueOr:
|
||||
if error.code == NO_PEERS_TO_RELAY and
|
||||
error.desc != some("No peers for topic, skipping publish"):
|
||||
# TODO: We need better separation of errors happening on the client side or the server side.-
|
||||
return err("dial_failure")
|
||||
else:
|
||||
return err($error.code)
|
||||
return ok()
|
||||
Loading…
x
Reference in New Issue
Block a user