84 lines
2.8 KiB
Python
Executable File
84 lines
2.8 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
|
|
def get_argv(index: int, default: Any = None) -> Any:
|
|
try:
|
|
return sys.argv[index]
|
|
except IndexError:
|
|
return default
|
|
|
|
|
|
username = get_argv(1, "admin")
|
|
password = get_argv(2, "admin")
|
|
realm_name = get_argv(3, "spiffworkflow")
|
|
|
|
OPEN_ID_CODE = ":this_is_not_secure_do_not_use_in_production"
|
|
|
|
backend_base_url = os.getenv("BACKEND_BASE_URL", "http://localhost:7000")
|
|
backend_client_id = os.getenv("BACKEND_CLIENT_ID", "spiffworkflow-backend")
|
|
backend_client_secret = os.getenv("BACKEND_CLIENT_secret", "JXeQExm0JhQPLumgHtIIqf52bDalHz0q")
|
|
|
|
openid_token_url = os.getenv("OPENID_TOKEN_URL")
|
|
keycloak_base_url = os.getenv("KEYCLOAK_BASE_URL")
|
|
if openid_token_url is None:
|
|
if keycloak_base_url is None:
|
|
if "spiffworkflow.org" in backend_base_url:
|
|
pattern = r".*api\.(\w+\.spiffworkflow.org).*"
|
|
match = re.search(pattern, backend_base_url)
|
|
if match is None:
|
|
raise Exception("Could not determine openid url based on backend url")
|
|
env_domain = match.group(1)
|
|
keycloak_base_url = "https://keycloak.${env_domain}"
|
|
elif "localhost:7000" in backend_base_url:
|
|
keycloak_base_url = "http://localhost:7002"
|
|
openid_token_url = f"{keycloak_base_url}/realms/{realm_name}/protocol/openid-connect/token"
|
|
|
|
|
|
def get_auth_token_object() -> dict:
|
|
backend_basic_auth_string = f"{backend_client_id}:{backend_client_secret}"
|
|
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
|
|
backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Authorization": f"Basic {backend_basic_auth.decode('utf-8')}",
|
|
}
|
|
data = {
|
|
"grant_type": "password",
|
|
"code": username + OPEN_ID_CODE,
|
|
"username": username,
|
|
"password": password,
|
|
"client_id": backend_client_id,
|
|
}
|
|
|
|
if openid_token_url is None:
|
|
raise Exception("Please specify the OPENID_TOKEN_URL")
|
|
|
|
response = requests.post(openid_token_url, data=data, headers=headers, timeout=15)
|
|
auth_token_object: dict = json.loads(response.text)
|
|
return auth_token_object
|
|
|
|
|
|
# ruff: noqa: T201
|
|
|
|
for token_identifier, token in get_auth_token_object().items():
|
|
# if the k is access_token, print just it to stdout
|
|
print(f"{token_identifier}:", file=sys.stderr)
|
|
if token_identifier == "access_token": # noqa: S105
|
|
# print token with no newline
|
|
print(token, end="")
|
|
# flush the buffer to stdout
|
|
sys.stdout.flush()
|
|
print("\n", file=sys.stderr)
|
|
else:
|
|
# print the rest of the key value pairs to stderr
|
|
print(f"{token}\n", file=sys.stderr)
|