spiff-arena/spiffworkflow-backend/bin/get_token

84 lines
2.8 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
import base64
import json
import os
import re
import sys
from typing import Any
import requests
def get_argv(index: int, default: Any = None) -> Any:
try:
return sys.argv[index]
except IndexError:
return default
username = get_argv(1, "admin")
password = get_argv(2, "admin")
realm_name = get_argv(3, "spiffworkflow")
OPEN_ID_CODE = ":this_is_not_secure_do_not_use_in_production"
backend_base_url = os.getenv("BACKEND_BASE_URL", "http://localhost:7000")
backend_client_id = os.getenv("BACKEND_CLIENT_ID", "spiffworkflow-backend")
backend_client_secret = os.getenv("BACKEND_CLIENT_secret", "JXeQExm0JhQPLumgHtIIqf52bDalHz0q")
openid_token_url = os.getenv("OPENID_TOKEN_URL")
keycloak_base_url = os.getenv("KEYCLOAK_BASE_URL")
if openid_token_url is None:
if keycloak_base_url is None:
if "spiffworkflow.org" in backend_base_url:
pattern = r".*api\.(\w+\.spiffworkflow.org).*"
match = re.search(pattern, backend_base_url)
if match is None:
raise Exception("Could not determine openid url based on backend url")
env_domain = match.group(1)
keycloak_base_url = "https://keycloak.${env_domain}"
elif "localhost:7000" in backend_base_url:
keycloak_base_url = "http://localhost:7002"
openid_token_url = f"{keycloak_base_url}/realms/{realm_name}/protocol/openid-connect/token"
def get_auth_token_object() -> dict:
backend_basic_auth_string = f"{backend_client_id}:{backend_client_secret}"
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {backend_basic_auth.decode('utf-8')}",
}
data = {
"grant_type": "password",
"code": username + OPEN_ID_CODE,
"username": username,
"password": password,
"client_id": backend_client_id,
}
if openid_token_url is None:
raise Exception("Please specify the OPENID_TOKEN_URL")
response = requests.post(openid_token_url, data=data, headers=headers, timeout=15)
auth_token_object: dict = json.loads(response.text)
return auth_token_object
# ruff: noqa: T201
for token_identifier, token in get_auth_token_object().items():
# if the k is access_token, print just it to stdout
print(f"{token_identifier}:", file=sys.stderr)
if token_identifier == "access_token": # noqa: S105
# print token with no newline
print(token, end="")
# flush the buffer to stdout
sys.stdout.flush()
print("\n", file=sys.stderr)
else:
# print the rest of the key value pairs to stderr
print(f"{token}\n", file=sys.stderr)