Merge pull request #65 from sartography/feature/auth_from_cullerton_branch

Feature/auth from cullerton branch
This commit is contained in:
jasquat 2022-07-29 16:09:55 -04:00 committed by GitHub
commit 1436092c11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 665 additions and 435 deletions

View File

@ -31,7 +31,14 @@ if [[ "${APPLICATION_ROOT:-}" != "/" ]]; then
additional_args="${additional_args} -e SCRIPT_NAME=${APPLICATION_ROOT}"
fi
# HACK: if loading fixtures for acceptance tests when we do not need multiple workers
# it causes issues with attempting to add duplicate data to the db
workers=3
if [[ "${SPIFFWORKFLOW_BACKEND_LOAD_FIXTURE_DATA:-}" == "true" ]]; then
workers=1
fi
export IS_GUNICORN="true"
# THIS MUST BE THE LAST COMMAND!
exec poetry run gunicorn ${additional_args} --bind "0.0.0.0:$port" --workers=3 --timeout 90 --capture-output --access-logfile '-' --log-level debug wsgi:app
exec poetry run gunicorn ${additional_args} --bind "0.0.0.0:$port" --workers="$workers" --timeout 90 --capture-output --access-logfile '-' --log-level debug wsgi:app

View File

@ -24,7 +24,7 @@ paths:
tags:
- Authentication
responses:
'200':
"200":
description: Redirects to authentication server
/login_return:
parameters:
@ -49,7 +49,7 @@ paths:
tags:
- Authentication
responses:
'200':
"200":
description: Test Return Response
/logout:
parameters:
@ -70,7 +70,7 @@ paths:
tags:
- Authentication
responses:
'200':
"200":
description: Logout Authenticated User
/logout_return:
get:
@ -80,7 +80,7 @@ paths:
tags:
- Authentication
responses:
'200':
"200":
description: Logout Authenticated User
/login_swagger:
@ -110,12 +110,12 @@ paths:
tags:
- Authentication
responses:
'304':
"304":
description: Redirection to the hosted frontend with an auth_token header.
/status:
get:
# security: []
# security: []
operationId: spiffworkflow_backend.routes.process_api_blueprint.status
summary: Returns 200 if the server is Responding
tags:
@ -768,7 +768,7 @@ paths:
get:
tags:
- Tasks
# security: []
# security: []
operationId: spiffworkflow_backend.routes.process_api_blueprint.task_list_my_tasks
summary: returns the list of ready or waiting tasks for a user
responses:

View File

@ -14,7 +14,11 @@ CORS_ALLOW_ORIGINS = re.split(
)
# Keycloak server
KEYCLOAK_SERVER_URL = environ.get("KEYCLOAK_SERVER_URL", default="http://localhost:7002")
KEYCLOAK_SERVER_URL = environ.get(
"KEYCLOAK_SERVER_URL", default="http://localhost:7002"
)
KEYCLOAK_CLIENT_ID = environ.get("KEYCLOAK_CLIENT_ID", default="spiffworkflow-backend")
KEYCLOAK_REALM_NAME = environ.get("KEYCLOAK_REALM_NAME", default="spiffworkflow")
KEYCLOAK_CLIENT_SECRET_KEY = environ.get("KEYCLOAK_CLIENT_SECRET_KEY", default="JXeQExm0JhQPLumgHtIIqf52bDalHz0q") # noqa: S105
KEYCLOAK_CLIENT_SECRET_KEY = environ.get(
"KEYCLOAK_CLIENT_SECRET_KEY", default="JXeQExm0JhQPLumgHtIIqf52bDalHz0q"
) # noqa: S105

View File

@ -12,9 +12,7 @@ def find_or_create_user(username: str = "test_user1") -> Any:
"""Find_or_create_user."""
user = UserModel.query.filter_by(username=username).first()
if user is None:
user = UserModel(username=username,
service='local',
service_id=username)
user = UserModel(username=username, service="local", service_id=username)
db.session.add(user)
db.session.commit()

View File

@ -1,10 +1,7 @@
"""User."""
from typing import Union
import jwt
import marshmallow
from flask import current_app
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from marshmallow import Schema
@ -18,11 +15,7 @@ class UserModel(SpiffworkflowBaseDBModel):
"""UserModel."""
__tablename__ = "user"
__table_args__ = (
db.UniqueConstraint(
"service", "service_id", name="service_key"
),
)
__table_args__ = (db.UniqueConstraint("service", "service_id", name="service_key"),)
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(50), nullable=False, unique=True)
uid = db.Column(db.String(50), unique=True)
@ -57,7 +50,7 @@ class UserModel(SpiffworkflowBaseDBModel):
# 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=hours, minutes=0, seconds=0),
# 'iat': datetime.datetime.utcnow(),
"sub": f"service:{self.service}::service_id:{self.service_id}",
"token_type": "internal"
"token_type": "internal",
}
return jwt.encode(
payload,
@ -71,11 +64,12 @@ class UserModel(SpiffworkflowBaseDBModel):
@classmethod
def from_open_id_user_info(cls, user_info):
"""From_open_id_user_info."""
instance = cls()
instance.service = 'keycloak'
instance.service_id = user_info['sub']
instance.name = user_info['preferred_username']
instance.username = user_info['sub']
instance.service = "keycloak"
instance.service_id = user_info["sub"]
instance.name = user_info["preferred_username"]
instance.username = user_info["sub"]
return instance

View File

@ -1,22 +1,14 @@
"""APIs for dealing with process groups, process models, and process instances."""
from typing import Any
from typing import Union
from flask import Blueprint
from flask import current_app
from flask import flash
from flask import redirect
from flask import render_template
from flask import request
from flask import url_for
from flask_bpmn.models.db import db
from werkzeug.wrappers.response import Response
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.process_instance_report import (
ProcessInstanceReportModel,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)

View File

@ -1,24 +1,18 @@
"""User."""
import jwt
import requests
import json
from urllib.parse import urlencode, quote
import base64
from typing import Dict
from typing import Optional
from flask import g
import jwt
from flask import current_app
from flask import g
from flask import redirect
from flask import request
from flask.app import Flask
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authentication_service import PublicAuthenticationService, get_keycloak_args
from spiffworkflow_backend.services.authentication_service import (
PublicAuthenticationService,
)
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.user_service import UserService
@ -27,6 +21,7 @@ from spiffworkflow_backend.services.user_service import UserService
:synopsis: Single Sign On (SSO) user login and session handlers
"""
def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
"""Verify the token for the user (if provided).
@ -42,34 +37,37 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
ApiError: If not on production and token is not valid, returns an 'invalid_token' 403 error.
If on production and user is not authenticated, returns a 'no_user' 403 error.
"""
if token:
user_info = None
token_type = get_token_type(token)
if token_type == 'id_token' :
if token_type == "id_token":
try:
user_info = AuthorizationService().get_user_info_from_id_token(token)
except ApiError as ae:
raise ae
except Exception as e:
current_app.logger.error(f"Exception raised in get_token: {e}")
raise ApiError(code="fail_get_user_info",
message="Cannot get user info from token")
raise ApiError(
code="fail_get_user_info", message="Cannot get user info from token"
)
if user_info and 'error' not in user_info: # not sure what to test yet
user_model = UserModel.query\
.filter(UserModel.service == 'keycloak')\
.filter(UserModel.service_id==user_info['sub'])\
if user_info and "error" not in user_info: # not sure what to test yet
user_model = (
UserModel.query.filter(UserModel.service == "keycloak")
.filter(UserModel.service_id == user_info["sub"])
.first()
)
if user_model is None:
# Do we ever get here any more, now that we have login_return method?
current_app.logger.debug("create_user in verify_token")
user_model = UserService().create_user(service='keycloak',
service_id=user_info['sub'],
name=user_info['name'],
username=user_info['preferred_username'],
email=user_info['email'])
user_model = UserService().create_user(
service="keycloak",
service_id=user_info["sub"],
name=user_info["name"],
username=user_info["preferred_username"],
email=user_info["email"],
)
if user_model:
g.user = user_model
@ -77,22 +75,18 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
if g.user:
g.token = token
scope = get_scope(token)
return {'uid': g.user.id,
'sub': g.user.id,
'scope': scope}
return {"uid": g.user.id, "sub": g.user.id, "scope": scope}
# return validate_scope(token, user_info, user_model)
else:
raise ApiError(code="no_user_id",
message="Cannot get a user id")
raise ApiError(code="no_user_id", message="Cannot get a user id")
# no user_info
else:
raise ApiError(code="no_user_info",
message="Cannot retrieve user info")
raise ApiError(code="no_user_info", message="Cannot retrieve user info")
# no token -- do we ever get here?
else:
if app.config.get('DEVELOPMENT'):
if app.config.get("DEVELOPMENT"):
# Fall back to a default user if this is not production.
g.user = UserModel.query.first()
if not g.user:
@ -105,11 +99,15 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
return token_info
else:
raise ApiError(code="no_auth_token",
message="No authorization token was available.",
status_code=401)
raise ApiError(
code="no_auth_token",
message="No authorization token was available.",
status_code=401,
)
def validate_scope(token) -> bool:
"""Validate_scope."""
print("validate_scope")
# token = AuthorizationService().refresh_token(token)
# user_info = AuthorizationService().get_user_info_from_public_access_token(token)
@ -119,80 +117,101 @@ def validate_scope(token) -> bool:
# introspection = AuthorizationService().introspect_token(basic_token)
return True
def api_login(uid, password, redirect_url=None):
def api_login(uid, password, redirect_url=None):
"""Api_login."""
token = PublicAuthenticationService().get_public_access_token(uid, password)
g.token = token
return token
def encode_auth_token(uid):
"""
Generates the Auth Token
:return: string
"""
payload = {
'sub': uid
}
payload = {"sub": uid}
return jwt.encode(
payload,
app.config.get('SECRET_KEY'),
algorithm='HS256',
app.config.get("SECRET_KEY"),
algorithm="HS256",
)
def login(redirect_url='/'):
def login(redirect_url="/"):
"""Login."""
state = PublicAuthenticationService.generate_state(redirect_url)
login_redirect_url = PublicAuthenticationService().get_login_redirect_url(state)
return redirect(login_redirect_url)
def login_return(code, state, session_state):
""""""
# TODO: Why does state look like this?
# 'b\'eydyZWRpcmVjdF91cmwnOiAnaHR0cDovL2xvY2FsaG9zdDo3MDAxLyd9\''
# It has an extra 'b at the beginning and an extra ' at the end,
# so we use state[2:-1]
state_dict = eval(base64.b64decode(state[2:-1]).decode('utf-8'))
state_redirect_url = state_dict['redirect_url']
state_dict = eval(base64.b64decode(state[2:-1]).decode("utf-8"))
state_redirect_url = state_dict["redirect_url"]
id_token_object = PublicAuthenticationService().get_id_token_object(code)
id_token = id_token_object['id_token']
id_token = id_token_object["id_token"]
if PublicAuthenticationService.validate_id_token(id_token):
user_info = AuthorizationService().get_user_info_from_id_token(id_token_object['access_token'])
if user_info and 'error' not in user_info:
user_model = UserModel.query.filter(UserModel.service == 'keycloak').filter(UserModel.service_id==user_info['sub']).first()
user_info = AuthorizationService().get_user_info_from_id_token(
id_token_object["access_token"]
)
if user_info and "error" not in user_info:
user_model = (
UserModel.query.filter(UserModel.service == "keycloak")
.filter(UserModel.service_id == user_info["sub"])
.first()
)
if user_model is None:
current_app.logger.debug("create_user in login_return")
name = username = email = ''
if 'name' in user_info:
name = user_info['name']
if 'username' in user_info:
username = user_info['username']
if 'email' in user_info:
email = user_info['email']
user_model = UserService().create_user(service='keycloak',
service_id=user_info['sub'],
name=name,
username=username,
email=email)
name = username = email = ""
if "name" in user_info:
name = user_info["name"]
if "username" in user_info:
username = user_info["username"]
if "email" in user_info:
email = user_info["email"]
user_model = UserService().create_user(
service="keycloak",
service_id=user_info["sub"],
name=name,
username=username,
email=email,
)
if user_model:
g.user = user_model.id
redirect_url = f"{state_redirect_url}?" + \
f"access_token={id_token_object['access_token']}&" + \
f"id_token={id_token}"
redirect_url = (
f"{state_redirect_url}?"
+ f"access_token={id_token_object['access_token']}&"
+ f"id_token={id_token}"
)
return redirect(redirect_url)
# return f"{code} {state} {id_token}"
def logout(id_token: str, redirect_url: str | None):
return PublicAuthenticationService().logout(id_token=id_token, redirect_url=redirect_url)
"""Logout."""
return PublicAuthenticationService().logout(
id_token=id_token, redirect_url=redirect_url
)
def logout_return():
"""Logout_return."""
return redirect(f"http://localhost:7001/")
def get_token_type(token) -> bool:
"""Get_token_type."""
token_type = None
try:
PublicAuthenticationService.validate_id_token(token)
@ -203,7 +222,7 @@ def get_token_type(token) -> bool:
except Exception as e:
print(f"Exception in get_token_type: {e}")
else:
token_type = 'id_token'
token_type = "id_token"
# try:
# # see if we have an open_id token
# decoded_token = AuthorizationService.decode_auth_token(token)
@ -211,11 +230,13 @@ def get_token_type(token) -> bool:
# if 'sub' in decoded_token and 'iss' in decoded_token and 'aud' in decoded_token:
# token_type = 'id_token'
# if 'token_type' in decoded_token and 'sub' in decoded_token:
# return True
# if 'token_type' in decoded_token and 'sub' in decoded_token:
# return True
return token_type
def get_scope(token):
"""Get_scope."""
decoded_token = jwt.decode(token, options={"verify_signature": False})
scope = decoded_token['scope']
scope = decoded_token["scope"]
return scope

View File

@ -5,7 +5,6 @@ from typing import Final
import flask.wrappers
from flask import Blueprint
from flask import current_app
from flask import request
from flask import Response
from flask_bpmn.api.api_error import ApiError
@ -13,7 +12,6 @@ from flask_bpmn.models.db import db
from sqlalchemy.exc import IntegrityError
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel

View File

@ -1,37 +1,42 @@
"""Authentication_service."""
import base64
import enum
import json
import time
from typing import Optional
import requests
import base64
import json
import enum
import random
import jwt
import time
from flask import g
import requests
from flask import current_app
from flask import redirect
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from keycloak import KeycloakOpenID # type: ignore
from keycloak.uma_permissions import AuthStatus # type: ignore
from keycloak import KeycloakAdmin
from spiffworkflow_backend.services.authorization_service import AuthorizationService
def get_keycloak_args():
keycloak_server_url = current_app.config['KEYCLOAK_SERVER_URL']
"""Get_keycloak_args."""
keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"]
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
keycloak_realm_name = current_app.config["KEYCLOAK_REALM_NAME"]
keycloak_client_secret_key = current_app.config["KEYCLOAK_CLIENT_SECRET_KEY"] # noqa: S105
return keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key
keycloak_client_secret_key = current_app.config[
"KEYCLOAK_CLIENT_SECRET_KEY"
] # noqa: S105
return (
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
)
class AuthenticationServiceProviders(enum.Enum):
keycloak = 'keycloak'
internal = 'internal'
"""AuthenticationServiceProviders."""
keycloak = "keycloak"
internal = "internal"
class PublicAuthenticationService:
@ -39,46 +44,72 @@ class PublicAuthenticationService:
It uses a separate public keycloak client: spiffworkflow-frontend
Used during development to make testing easy.
"""
def logout(self, redirect_url: str='/', id_token: str | None=None):
if id_token is None:
raise ApiError(code='missing_id_token',
message="id_token is missing",
status_code=400)
return_redirect_url = 'http://localhost:7000/v1.0/logout_return'
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = get_keycloak_args()
def logout(self, redirect_url: str = "/", id_token: str | None = None):
"""Logout."""
if id_token is None:
raise ApiError(
code="missing_id_token", message="id_token is missing", status_code=400
)
return_redirect_url = "http://localhost:7000/v1.0/logout_return"
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = get_keycloak_args()
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/logout?post_logout_redirect_uri={return_redirect_url}&id_token_hint={id_token}"
return redirect(request_url)
@staticmethod
def generate_state(redirect_url):
state = base64.b64encode(bytes(str({'redirect_url': redirect_url}), 'UTF-8'))
"""Generate_state."""
state = base64.b64encode(bytes(str({"redirect_url": redirect_url}), "UTF-8"))
return state
def get_login_redirect_url(self, state):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = get_keycloak_args()
return_redirect_url = 'http://localhost:7000/v1.0/login_return'
login_redirect_url = f'{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/auth?' + \
f'state={state}&' + \
'response_type=code&' + \
f'client_id={keycloak_client_id}&' + \
'scope=openid&' + \
f'redirect_uri={return_redirect_url}'
"""Get_login_redirect_url."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = get_keycloak_args()
return_redirect_url = "http://localhost:7000/v1.0/login_return"
login_redirect_url = (
f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/auth?"
+ f"state={state}&"
+ "response_type=code&"
+ f"client_id={keycloak_client_id}&"
+ "scope=openid&"
+ f"redirect_uri={return_redirect_url}"
)
return login_redirect_url
def get_id_token_object(self, code):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = get_keycloak_args()
"""Get_id_token_object."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = get_keycloak_args()
BACKEND_BASIC_AUTH_STRING = f"{keycloak_client_id}:{keycloak_client_secret_key}"
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding='ascii')
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding="ascii")
BACKEND_BASIC_AUTH = base64.b64encode(BACKEND_BASIC_AUTH_BYTES)
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}"}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}",
}
data = {'grant_type': 'authorization_code',
'code': code,
'redirect_uri': 'http://localhost:7000/v1.0/login_return'}
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:7000/v1.0/login_return",
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
@ -92,58 +123,80 @@ class PublicAuthenticationService:
https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation
"""
now = time.time()
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = get_keycloak_args()
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = get_keycloak_args()
try:
decoded_token = jwt.decode(id_token, options={"verify_signature": False})
except Exception as e:
raise ApiError(code='bad_id_token',
message="Cannot decode id_token",
status_code=401)
except Exception:
raise ApiError(
code="bad_id_token", message="Cannot decode id_token", status_code=401
)
try:
assert decoded_token['iss'] == f"{keycloak_server_url}/realms/{keycloak_realm_name}"
assert keycloak_client_id in decoded_token['aud'] or 'account' in decoded_token['aud']
if 'azp' in decoded_token:
assert (
decoded_token["iss"]
== f"{keycloak_server_url}/realms/{keycloak_realm_name}"
)
assert (
keycloak_client_id in decoded_token["aud"]
or "account" in decoded_token["aud"]
)
if "azp" in decoded_token:
# TODO: not sure why this isn't keycloak_client_id
assert decoded_token['azp'] in keycloak_client_id, 'account'
assert now > decoded_token['iat']
assert decoded_token["azp"] in keycloak_client_id, "account"
assert now > decoded_token["iat"]
except Exception as e:
current_app.logger.error(f"Exception validating id_token: {e}")
return False
try:
assert now < decoded_token['exp']
assert now < decoded_token["exp"]
except:
raise ApiError(code='invalid_token',
message="Your token is expired. Please Login",
status_code=401)
raise ApiError(
code="invalid_token",
message="Your token is expired. Please Login",
status_code=401,
)
return True
def get_public_access_token(self, username, password) -> dict:
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
"""Get_public_access_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
# Get public access token
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
post_data = {'grant_type': 'password',
'username': username,
'password': password,
'client_id': 'spiffworkflow-frontend'
}
post_data = {
"grant_type": "password",
"username": username,
"password": password,
"client_id": "spiffworkflow-frontend",
}
public_response = requests.post(request_url, headers=headers, data=post_data)
if public_response.status_code == 200:
public_token = json.loads(public_response.text)
if 'access_token' in public_token:
return public_token['access_token']
raise ApiError(code='no_public_access_token',
message=f"We could not get a public access token: {username}")
if "access_token" in public_token:
return public_token["access_token"]
raise ApiError(
code="no_public_access_token",
message=f"We could not get a public access token: {username}",
)
class AuthenticationService:
"""AuthenticationService."""
class KeycloakAuthenticationService:
"""KeycloakAuthenticationService."""
@staticmethod
def get_keycloak_openid(
@ -157,37 +210,39 @@ class KeycloakAuthenticationService:
client_secret_key=client_secret_key,
)
return keycloak_openid
#
# @staticmethod
# def get_keycloak_token(keycloak_openid, user, password):
# """Get_keycloak_token."""
# token = keycloak_openid.token(user, password)
# return token
#
# @staticmethod
# def get_permission_by_token(
# keycloak_openid: KeycloakOpenID, token: dict
# ) -> Optional[list[dict]]:
# """Get_permission_by_token."""
# # Get permissions by token
# # KEYCLOAK_PUBLIC_KEY = keycloak_openid.public_key()
# # KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----"
# # policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode',
# # key=KEYCLOAK_PUBLIC_KEY)
# permissions: list = keycloak_openid.get_permissions( # noqa: S106
# token["access_token"], method_token_info="introspect"
# )
# # TODO: Not sure if this is good. Permissions comes back as None
# return permissions
#
# @staticmethod
# def get_uma_permissions_by_token(
# keycloak_openid: KeycloakOpenID, token: dict
# ) -> Optional[list[dict]]:
# """Get_uma_permissions_by_token."""
# permissions: list = keycloak_openid.uma_permissions(token["access_token"])
# return permissions
#
#
# @staticmethod
# def get_keycloak_token(keycloak_openid, user, password):
# """Get_keycloak_token."""
# token = keycloak_openid.token(user, password)
# return token
#
# @staticmethod
# def get_permission_by_token(
# keycloak_openid: KeycloakOpenID, token: dict
# ) -> Optional[list[dict]]:
# """Get_permission_by_token."""
# # Get permissions by token
# # KEYCLOAK_PUBLIC_KEY = keycloak_openid.public_key()
# # KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----"
# # policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode',
# # key=KEYCLOAK_PUBLIC_KEY)
# permissions: list = keycloak_openid.get_permissions( # noqa: S106
# token["access_token"], method_token_info="introspect"
# )
# # TODO: Not sure if this is good. Permissions comes back as None
# return permissions
#
# @staticmethod
# def get_uma_permissions_by_token(
# keycloak_openid: KeycloakOpenID, token: dict
# ) -> Optional[list[dict]]:
# """Get_uma_permissions_by_token."""
# permissions: list = keycloak_openid.uma_permissions(token["access_token"])
# return permissions
#
@staticmethod
def get_uma_permissions_by_token_for_resource_and_scope(
keycloak_openid: KeycloakOpenID, token: dict, resource: str, scope: str
@ -197,6 +252,8 @@ class KeycloakAuthenticationService:
token["access_token"], permissions=f"{resource}#{scope}"
)
return permissions
#
# @staticmethod
# def get_auth_status_for_resource_and_scope_by_token(
@ -228,5 +285,6 @@ class KeycloakAuthenticationService:
class KeyCloak:
"""KeyCloak."""
"""Class to interact with KeyCloak server for authorization"""
"""Class to interact with KeyCloak server for authorization."""

View File

@ -1,35 +1,46 @@
"""Authorization_service."""
import requests
import base64
import json
import jwt
import enum
from flask import g
from flask import current_app
from flask_bpmn.api.api_error import ApiError
from typing import Union
from spiffworkflow_backend.models.user import UserModel
import jwt
import requests
from flask import current_app
from flask_bpmn.api.api_error import ApiError
class AuthorizationService:
"""Determine whether a user has permission to perform their request."""
@staticmethod
def get_keycloak_args():
keycloak_server_url = current_app.config['KEYCLOAK_SERVER_URL']
"""Get_keycloak_args."""
keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"]
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
keycloak_realm_name = current_app.config["KEYCLOAK_REALM_NAME"]
keycloak_client_secret_key = current_app.config["KEYCLOAK_CLIENT_SECRET_KEY"] # noqa: S105
return keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key
keycloak_client_secret_key = current_app.config[
"KEYCLOAK_CLIENT_SECRET_KEY"
] # noqa: S105
return (
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
)
def get_user_info_from_id_token(self, token):
"""This seems to work with basic tokens too"""
"""This seems to work with basic tokens too."""
json_data = None
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
BACKEND_BASIC_AUTH_STRING = f"{keycloak_client_id}:{keycloak_client_secret_key}"
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding='ascii')
BACKEND_BASIC_AUTH = base64.b64encode(BACKEND_BASIC_AUTH_BYTES)
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding="ascii")
base64.b64encode(BACKEND_BASIC_AUTH_BYTES)
# headers = {"Content-Type": "application/x-www-form-urlencoded",
# "Authorization": f"Bearer {BACKEND_BASIC_AUTH.decode('utf-8')}"}
@ -46,11 +57,13 @@ class AuthorizationService:
# auth_bearer_string = f"Basic {keycloak_client_secret_key}"
headers = {"Authorization": f"Bearer {token}"}
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'client_id': keycloak_client_id,
# "subject_token": id_token_object['access_token'],
"subject_token": token,
"audience": keycloak_client_id}
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": keycloak_client_id,
# "subject_token": id_token_object['access_token'],
"subject_token": token,
"audience": keycloak_client_id,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/userinfo"
try:
@ -60,38 +73,56 @@ class AuthorizationService:
if request_response.status_code == 200:
json_data = json.loads(request_response.text)
elif request_response.status_code == 401:
raise ApiError(code="invalid_token",
message="Please login",
status_code=401)
raise ApiError(
code="invalid_token", message="Please login", status_code=401
)
return json_data
def refresh_token(self, token):
"""Refresh_token."""
# if isinstance(token, str):
# token = eval(token)
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
data = {'grant_type': 'refresh_token',
'client_id': 'spiffworkflow-frontend',
'subject_token': token,
'refresh_token': token}
data = {
"grant_type": "refresh_token",
"client_id": "spiffworkflow-frontend",
"subject_token": token,
"refresh_token": token,
}
refresh_response = requests.post(request_url, headers=headers, data=data)
refresh_token = json.loads(refresh_response.text)
return refresh_token
def get_bearer_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
"""Get_bearer_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
BACKEND_BASIC_AUTH_STRING = f"{keycloak_client_id}:{keycloak_client_secret_key}"
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding='ascii')
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding="ascii")
BACKEND_BASIC_AUTH = base64.b64encode(BACKEND_BASIC_AUTH_BYTES)
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}"}
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'client_id': keycloak_client_id,
"subject_token": basic_token,
"audience": keycloak_client_id}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}",
}
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": keycloak_client_id,
"subject_token": basic_token,
"audience": keycloak_client_id,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
backend_response = requests.post(request_url, headers=headers, data=data)
@ -126,20 +157,31 @@ class AuthorizationService:
) from exception
def get_bearer_token_from_internal_token(self, internal_token):
decoded_token = self.decode_auth_token(internal_token)
"""Get_bearer_token_from_internal_token."""
self.decode_auth_token(internal_token)
print(f"get_user_by_internal_token: {internal_token}")
def introspect_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
"""Introspect_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
'token': basic_token}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string,
}
data = {
"client_id": keycloak_client_id,
"client_secret": keycloak_client_secret_key,
"token": basic_token,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token/introspect"
introspect_response = requests.post(request_url, headers=headers, data=data)
@ -148,7 +190,13 @@ class AuthorizationService:
return introspection
def get_permission_by_basic_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
"""Get_permission_by_basic_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
# bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
@ -156,36 +204,49 @@ class AuthorizationService:
# auth_bearer_string = f"Bearer {bearer_token['access_token']}"
auth_bearer_string = f"Bearer {bearer_token}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"response_mode": "permissions",
"audience": keycloak_client_id,
"response_include_resource_name": True
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string,
}
data = {
"client_id": keycloak_client_id,
"client_secret": keycloak_client_secret_key,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"response_mode": "permissions",
"audience": keycloak_client_id,
"response_include_resource_name": True,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text)
return permission
def get_auth_status_for_resource_and_scope_by_token(self, basic_token, resource, scope):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
def get_auth_status_for_resource_and_scope_by_token(
self, basic_token, resource, scope
):
"""Get_auth_status_for_resource_and_scope_by_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string,
}
data = {
'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"client_id": keycloak_client_id,
"client_secret": keycloak_client_secret_key,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": f"{resource}#{scope}",
"response_mode": "permissions",
"audience": keycloak_client_id
"audience": keycloak_client_id,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
auth_response = requests.post(request_url, headers=headers, data=data)
@ -194,28 +255,39 @@ class AuthorizationService:
auth_status = json.loads(auth_response.text)
return auth_status
def get_permissions_by_token_for_resource_and_scope(self, basic_token, resource=None, scope=None):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
def get_permissions_by_token_for_resource_and_scope(
self, basic_token, resource=None, scope=None
):
"""Get_permissions_by_token_for_resource_and_scope."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
# bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
permision = ''
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string,
}
permision = ""
if resource:
permision += resource
if scope:
permision += '#' + resource
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"response_mode": "permissions",
"permission": permision,
"audience": keycloak_client_id,
"response_include_resource_name": True
permision += "#" + resource
data = {
"client_id": keycloak_client_id,
"client_secret": keycloak_client_secret_key,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"response_mode": "permissions",
"permission": permision,
"audience": keycloak_client_id,
"response_include_resource_name": True,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
@ -223,16 +295,26 @@ class AuthorizationService:
return permission
def get_resource_set(self, public_access_token, uri):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
"""Get_resource_set."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
bearer_token = AuthorizationService().get_bearer_token(public_access_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/json",
"Authorization": auth_bearer_string}
data = {'matchingUri': 'true',
'deep': 'true',
'max': '-1',
'exactName': 'false',
'uri': uri}
headers = {
"Content-Type": "application/json",
"Authorization": auth_bearer_string,
}
data = {
"matchingUri": "true",
"deep": "true",
"max": "-1",
"exactName": "false",
"uri": uri,
}
# f"matchingUri=true&deep=true&max=-1&exactName=false&uri={URI_TO_TEST_AGAINST}"
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/authz/protection/resource_set"
@ -241,13 +323,23 @@ class AuthorizationService:
print("get_resource_set")
def get_permission_by_token(self, public_access_token) -> dict:
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
"""Get_permission_by_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = AuthorizationService.get_keycloak_args()
bearer_token = AuthorizationService().get_bearer_token(public_access_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"audience": keycloak_client_id}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string,
}
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"audience": keycloak_client_id,
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text)

View File

@ -352,6 +352,18 @@ class ProcessInstanceProcessor:
if not self.bpmn_process_instance._is_engine_task(
ready_or_waiting_task.task_spec
):
user_id = ready_or_waiting_task.data['current_user']['id']
principal = PrincipalModel.query.filter_by(user_id=user_id).first()
if principal is None:
raise (
ApiError(
code="principal_not_found",
message=f"Principal not found from user id: {user_id}",
status_code=400,
)
)
extensions = ready_or_waiting_task.task_spec.extensions
form_file_name = None
@ -365,8 +377,7 @@ class ProcessInstanceProcessor:
active_task = ActiveTaskModel(
process_instance_id=self.process_instance_model.id,
# FIXME: look for the correct principal based on ready_or_waiting_task.lane
assigned_principal_id=PrincipalModel.query.first().id,
assigned_principal_id=principal.id,
form_file_name=form_file_name,
ui_form_file_name=ui_form_file_name,
task_id=str(ready_or_waiting_task.id),

View File

@ -15,15 +15,18 @@ class UserService:
"""Provides common tools for working with users."""
def create_user(self, service, service_id, name=None, username=None, email=None):
user = UserModel.query.filter(UserModel.service == service)\
.filter(UserModel.service_id == service_id)\
"""Create_user."""
user = (
UserModel.query.filter(UserModel.service == service)
.filter(UserModel.service_id == service_id)
.first()
)
if name is None:
name = ''
name = ""
if username is None:
username = ''
username = ""
if email is None:
email = ''
email = ""
if user is not None:
raise (
@ -34,24 +37,29 @@ class UserService:
)
)
user = UserModel(username=username,
service=service,
service_id=service_id,
name=name,
email=email)
user = UserModel(
username=username,
service=service,
service_id=service_id,
name=name,
email=email,
)
try:
db.session.add(user)
except IntegrityError as exception:
raise (
ApiError(code="integrity_error", message=repr(exception), status_code=500)
ApiError(
code="integrity_error", message=repr(exception), status_code=500
)
) from exception
try:
db.session.commit()
except Exception as e:
db.session.rollback()
raise ApiError(code='add_user_error',
message=f'Could not add user {username}') from e
raise ApiError(
code="add_user_error", message=f"Could not add user {username}"
) from e
try:
self.create_principal(user.id)
except ApiError as ae:
@ -234,6 +242,7 @@ class UserService:
)
def create_principal(self, user_id):
"""Create_principal."""
principal = PrincipalModel.query.filter_by(user_id=user_id).first()
if principal is None:
principal = PrincipalModel(user_id=user_id)
@ -243,6 +252,8 @@ class UserService:
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Exception in create_principal: {e}")
raise ApiError(code="add_principal_error",
message=f"Could not create principal {user_id}") from e
raise ApiError(
code="add_principal_error",
message=f"Could not create principal {user_id}",
) from e
return principal

View File

@ -1,6 +1,9 @@
"""Base_test."""
from flask.app import Flask
from spiffworkflow_backend.services.authentication_service import PublicAuthenticationService
from spiffworkflow_backend.services.authentication_service import (
PublicAuthenticationService,
)
class BaseTest:
@ -9,15 +12,24 @@ class BaseTest:
@staticmethod
def get_keycloak_constants(app: Flask) -> tuple:
"""Get_keycloak_constants."""
keycloak_server_url = app.config['KEYCLOAK_SERVER_URL']
keycloak_server_url = app.config["KEYCLOAK_SERVER_URL"]
keycloak_client_id = app.config["KEYCLOAK_CLIENT_ID"]
keycloak_realm_name = app.config["KEYCLOAK_REALM_NAME"]
keycloak_client_secret_key = app.config["KEYCLOAK_CLIENT_SECRET_KEY"] # noqa: S105
keycloak_client_secret_key = app.config[
"KEYCLOAK_CLIENT_SECRET_KEY"
] # noqa: S105
return keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key
return (
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
)
@staticmethod
def get_public_access_token(username, password) -> dict:
public_access_token = PublicAuthenticationService().get_public_access_token(username, password)
"""Get_public_access_token."""
public_access_token = PublicAuthenticationService().get_public_access_token(
username, password
)
return public_access_token

View File

@ -1,25 +1,17 @@
"""Test_authentication."""
import json
import requests
import base64
import urllib.parse
from typing import Any
import json
import requests
from flask.app import Flask
from flask.testing import FlaskClient
from keycloak.authorization import Authorization # type: ignore
from keycloak.keycloak_openid import KeycloakOpenID # type: ignore
from keycloak.uma_permissions import AuthStatus # type: ignore
from tests.spiffworkflow_backend.integration.base_test import BaseTest
from spiffworkflow_backend.services.authentication_service import PublicAuthenticationService, KeycloakAuthenticationService
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from urllib.parse import urlencode
class TestAuthentication(BaseTest):
"""TestAuthentication."""
# def test_get_basic_token(self, app: Flask) -> None:
# for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
@ -35,34 +27,45 @@ class TestAuthentication(BaseTest):
# assert isinstance(basic_token['scope'], str)
def test_get_token_script(self, app: Flask) -> None:
"""Test_get_token_script."""
print("Test Get Token Script")
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = self.get_keycloak_constants(app)
keycloak_user = 'ciuser1'
keycloak_pass = 'ciuser1'
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = self.get_keycloak_constants(app)
keycloak_user = "ciuser1"
keycloak_pass = "ciuser1"
print(f"Test Get Token Script: keycloak_server_url: {keycloak_server_url}")
print(f"Test Get Token Script: keycloak_client_id: {keycloak_client_id}")
print(f"Test Get Token Script: keycloak_realm_name: {keycloak_realm_name}")
print(f"Test Get Token Script: keycloak_client_secret_key: {keycloak_client_secret_key}")
print(
f"Test Get Token Script: keycloak_client_secret_key: {keycloak_client_secret_key}"
)
frontend_client_id = 'spiffworkflow-frontend'
frontend_client_id = "spiffworkflow-frontend"
print(f"Test Get Token Script: frontend_client_id: {frontend_client_id}")
# Get frontend token
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
post_data = {'grant_type': 'password',
'username': keycloak_user,
'password': keycloak_pass,
'client_id': frontend_client_id
}
post_data = {
"grant_type": "password",
"username": keycloak_user,
"password": keycloak_pass,
"client_id": frontend_client_id,
}
print(f"Test Get Token Script: request_url: {request_url}")
print(f"Test Get Token Script: headers: {headers}")
print(f"Test Get Token Script: post_data: {post_data}")
frontend_response = requests.post(request_url, headers=headers, json=post_data, data=post_data)
frontend_response = requests.post(
request_url, headers=headers, json=post_data, data=post_data
)
frontend_token = json.loads(frontend_response.text)
print(f"Test Get Token Script: frontend_response: {frontend_response}")
@ -77,37 +80,44 @@ class TestAuthentication(BaseTest):
# Get backend token
BACKEND_BASIC_AUTH_STRING = f"{keycloak_client_id}:{keycloak_client_secret_key}"
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding='ascii')
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding="ascii")
BACKEND_BASIC_AUTH = base64.b64encode(BACKEND_BASIC_AUTH_BYTES)
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}"}
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'client_id': keycloak_client_id,
"subject_token": frontend_token['access_token'],
"audience": keycloak_client_id}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}",
}
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": keycloak_client_id,
"subject_token": frontend_token["access_token"],
"audience": keycloak_client_id,
}
print(f"Test Get Token Script: request_url: {request_url}")
print(f"Test Get Token Script: headers: {headers}")
print(f"Test Get Token Script: data: {data}")
backend_response = requests.post(request_url, headers=headers, data=data)
json_data = json.loads(backend_response.text)
backend_token = json_data['access_token']
backend_token = json_data["access_token"]
print(f"Test Get Token Script: backend_response: {backend_response}")
print(f"Test Get Token Script: backend_token: {backend_token}")
if backend_token:
# Getting resource set
auth_bearer_string = f"Bearer {backend_token}"
headers = {"Content-Type": "application/json",
"Authorization": auth_bearer_string}
headers = {
"Content-Type": "application/json",
"Authorization": auth_bearer_string,
}
# URI_TO_TEST_AGAINST = "%2Fprocess-models"
URI_TO_TEST_AGAINST = "/status"
request_url = \
f"{keycloak_server_url}/realms/{keycloak_realm_name}/authz/protection/resource_set?" + \
f"matchingUri=true&deep=true&max=-1&exactName=false&uri={URI_TO_TEST_AGAINST}"
request_url = (
f"{keycloak_server_url}/realms/{keycloak_realm_name}/authz/protection/resource_set?"
+ f"matchingUri=true&deep=true&max=-1&exactName=false&uri={URI_TO_TEST_AGAINST}"
)
# f"uri={URI_TO_TEST_AGAINST}"
print(f"Test Get Token Script: request_url: {request_url}")
print(f"Test Get Token Script: headers: {headers}")
@ -118,35 +128,43 @@ class TestAuthentication(BaseTest):
json_data = json.loads(resource_result.text)
resource_id_name_pairs = []
for result in json_data:
if '_id' in result and result['_id']:
pair_key = result['_id']
if 'name' in result and result['name']:
pair_value = result['name']
if "_id" in result and result["_id"]:
pair_key = result["_id"]
if "name" in result and result["name"]:
pair_value = result["name"]
# pair = {{result['_id']}: {}}
else:
pair_value = 'no_name'
pair_value = "no_name"
# pair = {{result['_id']}: }
pair = [pair_key, pair_value]
resource_id_name_pairs.append(pair)
print(f"Test Get Token Script: resource_id_name_pairs: {resource_id_name_pairs}")
print(
f"Test Get Token Script: resource_id_name_pairs: {resource_id_name_pairs}"
)
# Getting Permissions
for resource_id_name_pair in resource_id_name_pairs:
resource_id = resource_id_name_pair[0]
resource_name = resource_id_name_pair[1]
resource_id_name_pair[1]
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}"}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}",
}
post_data = {"audience": keycloak_client_id,
"permission": resource_id,
"subject_token": backend_token,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket"}
post_data = {
"audience": keycloak_client_id,
"permission": resource_id,
"subject_token": backend_token,
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
}
print(f"Test Get Token Script: headers: {headers}")
print(f"Test Get Token Script: post_data: {post_data}")
print(f"Test Get Token Script: request_url: {request_url}")
permission_result = requests.post(request_url, headers=headers, data=post_data)
permission_result = requests.post(
request_url, headers=headers, data=post_data
)
print(f"Test Get Token Script: permission_result: {permission_result}")
print("test_get_token_script")
@ -155,6 +173,7 @@ class TestAuthentication(BaseTest):
# keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = self.get_keycloak_constants(app)
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/auth"
# class TestOtherStuff(BaseTest):
#
# # def test_get_backend_token(self, app: Flask) -> None:
@ -286,33 +305,33 @@ class TestAuthentication(BaseTest):
# assert len(permission["scopes"]) == 1
# assert permission["scopes"][0] == "account:view"
#
# def test_get_uma_permissions_by_token_for_resource_and_scope(self, app: Flask) -> None:
# """Test_get_uma_permissions_by_token_for_resource_and_scope."""
# keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = self.get_keycloak_constants(app)
# keycloak_openid = KeycloakAuthenticationService.get_keycloak_openid(
# keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key
# )
# token = keycloak_openid.token('admin_1', 'admin_1')
# resource = "Process Groups"
# scope = "read"
#
# permissions = (
# KeycloakAuthenticationService.get_uma_permissions_by_token_for_resource_and_scope(
# keycloak_openid, token, resource, scope
# )
# )
# assert isinstance(permissions, list)
# # assert len(permissions) == 1
# assert isinstance(permissions[0], dict)
# permission = permissions[0]
# assert "rsname" in permission
# assert permission["rsname"] == resource
# assert "scopes" in permission
# assert isinstance(permission["scopes"], list)
# assert len(permission["scopes"]) == 1
# assert permission["scopes"][0] == scope
#
# print("test_get_uma_permissions_by_token_for_resource_and_scope")
# def test_get_uma_permissions_by_token_for_resource_and_scope(self, app: Flask) -> None:
# """Test_get_uma_permissions_by_token_for_resource_and_scope."""
# keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = self.get_keycloak_constants(app)
# keycloak_openid = KeycloakAuthenticationService.get_keycloak_openid(
# keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key
# )
# token = keycloak_openid.token('admin_1', 'admin_1')
# resource = "Process Groups"
# scope = "read"
#
# permissions = (
# KeycloakAuthenticationService.get_uma_permissions_by_token_for_resource_and_scope(
# keycloak_openid, token, resource, scope
# )
# )
# assert isinstance(permissions, list)
# # assert len(permissions) == 1
# assert isinstance(permissions[0], dict)
# permission = permissions[0]
# assert "rsname" in permission
# assert permission["rsname"] == resource
# assert "scopes" in permission
# assert isinstance(permission["scopes"], list)
# assert len(permission["scopes"]) == 1
# assert permission["scopes"][0] == scope
#
# print("test_get_uma_permissions_by_token_for_resource_and_scope")
#
# def test_get_auth_status_for_resource_and_scope_by_token(self, app: Flask) -> None:
# """Test_get_auth_status_for_resource_and_scope_by_token."""

View File

@ -1,8 +1,6 @@
"""Test_authorization."""
import requests # type: ignore
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.integration.base_test import BaseTest
from spiffworkflow_backend.services.authorization_service import AuthorizationService
@ -12,82 +10,95 @@ from spiffworkflow_backend.services.authorization_service import AuthorizationSe
class TestAuthorization(BaseTest):
"""TestAuthorization."""
def test_get_bearer_token(self, app: Flask) -> None:
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
"""Test_get_bearer_token."""
for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
public_access_token = self.get_public_access_token(user_id, user_id)
bearer_token = AuthorizationService().get_bearer_token(public_access_token)
assert isinstance(public_access_token, str)
assert isinstance(bearer_token, dict)
assert 'access_token' in bearer_token
assert isinstance(bearer_token['access_token'], str)
assert 'refresh_token' in bearer_token
assert isinstance(bearer_token['refresh_token'], str)
assert 'token_type' in bearer_token
assert bearer_token['token_type'] == 'Bearer'
assert 'scope' in bearer_token
assert isinstance(bearer_token['scope'], str)
assert "access_token" in bearer_token
assert isinstance(bearer_token["access_token"], str)
assert "refresh_token" in bearer_token
assert isinstance(bearer_token["refresh_token"], str)
assert "token_type" in bearer_token
assert bearer_token["token_type"] == "Bearer"
assert "scope" in bearer_token
assert isinstance(bearer_token["scope"], str)
def test_get_user_info_from_public_access_token(self, app: Flask) -> None:
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
"""Test_get_user_info_from_public_access_token."""
for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
public_access_token = self.get_public_access_token(user_id, user_id)
user_info = AuthorizationService().get_user_info_from_public_access_token(public_access_token)
assert 'sub' in user_info
assert isinstance(user_info['sub'], str)
assert len(user_info['sub']) == 36
assert 'preferred_username' in user_info
assert user_info['preferred_username'] == user_id
assert 'email' in user_info
assert user_info['email'] == f"{user_id}@example.com"
user_info = AuthorizationService().get_user_info_from_public_access_token(
public_access_token
)
assert "sub" in user_info
assert isinstance(user_info["sub"], str)
assert len(user_info["sub"]) == 36
assert "preferred_username" in user_info
assert user_info["preferred_username"] == user_id
assert "email" in user_info
assert user_info["email"] == f"{user_id}@example.com"
def test_introspect_token(self, app: Flask) -> None:
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = self.get_keycloak_constants(app)
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
"""Test_introspect_token."""
(
keycloak_server_url,
keycloak_client_id,
keycloak_realm_name,
keycloak_client_secret_key,
) = self.get_keycloak_constants(app)
for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
basic_token = self.get_public_access_token(user_id, user_id)
introspection = AuthorizationService().introspect_token(basic_token)
assert isinstance(introspection, dict)
assert introspection['typ'] == 'Bearer'
assert introspection['preferred_username'] == user_id
assert introspection['client_id'] == 'spiffworkflow-frontend'
assert introspection["typ"] == "Bearer"
assert introspection["preferred_username"] == user_id
assert introspection["client_id"] == "spiffworkflow-frontend"
assert 'resource_access' in introspection
resource_access = introspection['resource_access']
assert "resource_access" in introspection
resource_access = introspection["resource_access"]
assert isinstance(resource_access, dict)
assert keycloak_client_id in resource_access
client = resource_access[keycloak_client_id]
assert 'roles' in client
roles = client['roles']
assert "roles" in client
roles = client["roles"]
assert isinstance(roles, list)
if user_id == 'admin_1':
if user_id == "admin_1":
assert len(roles) == 2
for role in roles:
assert role in ('User', 'Admin')
elif user_id == 'admin_2':
assert role in ("User", "Admin")
elif user_id == "admin_2":
assert len(roles) == 1
assert roles[0] == 'User'
elif user_id == 'user_1' or user_id == 'user_2':
assert roles[0] == "User"
elif user_id == "user_1" or user_id == "user_2":
assert len(roles) == 2
for role in roles:
assert role in ('User', 'Anonymous')
assert role in ("User", "Anonymous")
def test_get_permission_by_token(self, app: Flask) -> None:
resource_names = 'Default Resource', 'Process Groups', 'Process Models'
"""Test_get_permission_by_token."""
output = {}
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
output[user_id] = {}
basic_token = self.get_public_access_token(user_id, user_id)
permissions = AuthorizationService().get_permission_by_basic_token(basic_token)
permissions = AuthorizationService().get_permission_by_basic_token(
basic_token
)
if isinstance(permissions, list):
for permission in permissions:
resource_name = permission['rsname']
resource_name = permission["rsname"]
output[user_id][resource_name] = {}
# assert resource_name in resource_names
# if resource_name == 'Process Groups' or resource_name == 'Process Models':
if 'scopes' in permission:
if "scopes" in permission:
# assert 'scopes' in permission
scopes = permission['scopes']
output[user_id][resource_name]['scopes'] = scopes
scopes = permission["scopes"]
output[user_id][resource_name]["scopes"] = scopes
# assert isinstance(scopes, list)
# assert len(scopes) == 1
# assert scopes[0] == 'read'
@ -128,15 +139,16 @@ class TestAuthorization(BaseTest):
print("test_get_permission_by_token")
def test_get_auth_status_for_resource_and_scope_by_token(self, app: Flask) -> None:
resources = 'Admin', 'Process Groups', 'Process Models'
"""Test_get_auth_status_for_resource_and_scope_by_token."""
resources = "Admin", "Process Groups", "Process Models"
# scope = 'read'
output = {}
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
output[user_id] = {}
basic_token = self.get_public_access_token(user_id, user_id)
for resource in resources:
output[user_id][resource] = {}
for scope in 'instantiate', 'read', 'update', 'delete':
for scope in "instantiate", "read", "update", "delete":
auth_status = AuthorizationService().get_auth_status_for_resource_and_scope_by_token(
basic_token, resource, scope
)
@ -144,20 +156,21 @@ class TestAuthorization(BaseTest):
print("test_get_auth_status_for_resource_and_scope_by_token")
def test_get_permissions_by_token_for_resource_and_scope(self, app: Flask):
resource_names = 'Default Resource', 'Process Groups', 'Process Models'
"""Test_get_permissions_by_token_for_resource_and_scope."""
resource_names = "Default Resource", "Process Groups", "Process Models"
output = {}
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
output[user_id] = {}
basic_token = self.get_public_access_token(user_id, user_id)
for resource in resource_names:
output[user_id][resource] = {}
for scope in 'instantiate', 'read', 'update', 'delete':
permissions = AuthorizationService().\
get_permissions_by_token_for_resource_and_scope(basic_token, resource, scope)
for scope in "instantiate", "read", "update", "delete":
permissions = AuthorizationService().get_permissions_by_token_for_resource_and_scope(
basic_token, resource, scope
)
output[user_id][resource][scope] = permissions
print("test_get_permissions_by_token_for_resource_and_scope")
# # def test_authorize_action(self, app: Flask, client: FlaskClient) -> None:
# # action = 'my_action'
# # result = app.get(f'/authorize/{action}')