mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-03-04 02:50:38 +00:00
load script
This commit is contained in:
parent
59a66d8851
commit
c9d7686e06
@ -7,22 +7,21 @@ import json
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def run_curl_command(message_identifier, username="admin", password="admin", realm_name="spiffworkflow"):
|
||||
def get_access_token(script_dir, username="admin", password="admin", realm_name="spiffworkflow"):
|
||||
"""
|
||||
Get access token once
|
||||
"""
|
||||
get_token_cmd = f"{script_dir}/get_token {username} {password} {realm_name}"
|
||||
return subprocess.check_output(get_token_cmd, shell=True, text=True).strip()
|
||||
|
||||
|
||||
def run_curl_command(message_identifier, access_token, backend_base_url):
|
||||
"""
|
||||
Execute the curl command for load testing
|
||||
|
||||
:return: Tuple of (success, result)
|
||||
"""
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
try:
|
||||
# Get access token
|
||||
get_token_cmd = f"{script_dir}/get_token {username} {password} {realm_name}"
|
||||
access_token = subprocess.check_output(get_token_cmd, shell=True, text=True).strip()
|
||||
|
||||
# Backend URL (use environment variable or default)
|
||||
backend_base_url = os.environ.get("BACKEND_BASE_URL", "http://localhost:7000")
|
||||
|
||||
# Login command
|
||||
login_cmd = f"curl --silent -X POST '{backend_base_url}/v1.0/login_with_access_token?access_token={access_token}' -H 'Authorization: Bearer {access_token}' >/dev/null"
|
||||
subprocess.run(login_cmd, shell=True, check=True)
|
||||
@ -51,6 +50,12 @@ def load_test(message_identifier, num_requests=10, max_workers=5, username="admi
|
||||
"""
|
||||
Perform load testing with concurrent requests and failure logging
|
||||
"""
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
backend_base_url = os.environ.get("BACKEND_BASE_URL", "http://localhost:7000")
|
||||
|
||||
# Get access token once
|
||||
access_token = get_access_token(script_dir, username, password, realm_name)
|
||||
|
||||
successful_requests = 0
|
||||
failed_requests = 0
|
||||
failure_log = []
|
||||
@ -59,7 +64,7 @@ def load_test(message_identifier, num_requests=10, max_workers=5, username="admi
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
# Create futures for all requests
|
||||
futures = [
|
||||
executor.submit(run_curl_command, message_identifier, username, password, realm_name) for _ in range(num_requests)
|
||||
executor.submit(run_curl_command, message_identifier, access_token, backend_base_url) for _ in range(num_requests)
|
||||
]
|
||||
|
||||
# Collect and process results
|
||||
|
Loading…
x
Reference in New Issue
Block a user