Adding a blueprint for openid - a very lightweight embedded authentication system to make it eaiser to try out SpiffWorkflow when you don't have openID set up with Google etal.
Removing all calls to open id's user_info endpoint - as these are unncessiary. Adding a users section to the permission files -- so we can handle all user/group/permissions in one file when needed. There was a very confusing is_admin function on the user model that needed killin.
This commit is contained in:
parent
ec0c6f4555
commit
975b961632
|
@ -19,6 +19,7 @@ from werkzeug.exceptions import NotFound
|
||||||
import spiffworkflow_backend.load_database_models # noqa: F401
|
import spiffworkflow_backend.load_database_models # noqa: F401
|
||||||
from spiffworkflow_backend.config import setup_config
|
from spiffworkflow_backend.config import setup_config
|
||||||
from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint
|
from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint
|
||||||
|
from spiffworkflow_backend.routes.openid_blueprint.openid_blueprint import openid_blueprint
|
||||||
from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint
|
from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint
|
||||||
from spiffworkflow_backend.routes.user import verify_token
|
from spiffworkflow_backend.routes.user import verify_token
|
||||||
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
|
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
|
||||||
|
@ -103,6 +104,7 @@ def create_app() -> flask.app.Flask:
|
||||||
app.register_blueprint(process_api_blueprint)
|
app.register_blueprint(process_api_blueprint)
|
||||||
app.register_blueprint(api_error_blueprint)
|
app.register_blueprint(api_error_blueprint)
|
||||||
app.register_blueprint(admin_blueprint, url_prefix="/admin")
|
app.register_blueprint(admin_blueprint, url_prefix="/admin")
|
||||||
|
app.register_blueprint(openid_blueprint, url_prefix="/openid")
|
||||||
|
|
||||||
origins_re = [
|
origins_re = [
|
||||||
r"^https?:\/\/%s(.*)" % o.replace(".", r"\.")
|
r"^https?:\/\/%s(.*)" % o.replace(".", r"\.")
|
||||||
|
|
|
@ -1,5 +1,15 @@
|
||||||
default_group: everybody
|
default_group: everybody
|
||||||
|
|
||||||
|
users:
|
||||||
|
admin:
|
||||||
|
email: admin@spiffworkflow.org
|
||||||
|
password: admin
|
||||||
|
dan:
|
||||||
|
email: dan@spiffworkflow.org
|
||||||
|
password: password
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
groups:
|
groups:
|
||||||
admin:
|
admin:
|
||||||
users:
|
users:
|
||||||
|
|
|
@ -83,10 +83,6 @@ class UserModel(SpiffworkflowBaseDBModel):
|
||||||
algorithm="HS256",
|
algorithm="HS256",
|
||||||
)
|
)
|
||||||
|
|
||||||
def is_admin(self) -> bool:
|
|
||||||
"""Is_admin."""
|
|
||||||
return True
|
|
||||||
|
|
||||||
# @classmethod
|
# @classmethod
|
||||||
# def from_open_id_user_info(cls, user_info: dict) -> Any:
|
# def from_open_id_user_info(cls, user_info: dict) -> Any:
|
||||||
# """From_open_id_user_info."""
|
# """From_open_id_user_info."""
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
"""__init__."""
|
|
@ -0,0 +1,116 @@
|
||||||
|
"""
|
||||||
|
Provides the bare minimum endpoints required by SpiffWorkflow to
|
||||||
|
handle openid authentication -- definitely not a production system.
|
||||||
|
This is just here to make local development, testing, and
|
||||||
|
demonstration easier.
|
||||||
|
"""
|
||||||
|
import base64
|
||||||
|
import time
|
||||||
|
import urllib
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
import jwt
|
||||||
|
import yaml
|
||||||
|
from flask import Blueprint, render_template, request, current_app, redirect, url_for, g
|
||||||
|
|
||||||
|
openid_blueprint = Blueprint(
|
||||||
|
"openid", __name__, template_folder="templates", static_folder="static"
|
||||||
|
)
|
||||||
|
|
||||||
|
MY_SECRET_CODE = ":this_should_be_some_crazy_code_different_all_the_time"
|
||||||
|
|
||||||
|
@openid_blueprint.route("/well-known/openid-configuration", methods=["GET"])
|
||||||
|
def well_known():
|
||||||
|
"""OpenID Discovery endpoint -- as these urls can be very different from system to system,
|
||||||
|
this is just a small subset."""
|
||||||
|
host_url = request.host_url.strip('/')
|
||||||
|
return {
|
||||||
|
"issuer": f"{host_url}/openid",
|
||||||
|
"authorization_endpoint": f"{host_url}{url_for('openid.auth')}",
|
||||||
|
"token_endpoint": f"{host_url}{url_for('openid.token')}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@openid_blueprint.route("/auth", methods=["GET"])
|
||||||
|
def auth():
|
||||||
|
"""Accepts a series of parameters"""
|
||||||
|
return render_template('login.html',
|
||||||
|
state=request.args.get('state'),
|
||||||
|
response_type=request.args.get('response_type'),
|
||||||
|
client_id=request.args.get('client_id'),
|
||||||
|
scope=request.args.get('scope'),
|
||||||
|
redirect_uri=request.args.get('redirect_uri'),
|
||||||
|
error_message=request.args.get('error_message'))
|
||||||
|
|
||||||
|
|
||||||
|
@openid_blueprint.route("/form_submit", methods=["POST"])
|
||||||
|
def form_submit():
|
||||||
|
|
||||||
|
users = get_users()
|
||||||
|
if request.values['Uname'] in users and request.values['Pass'] == users[request.values['Uname']]["password"]:
|
||||||
|
# Redirect back to the end user with some detailed information
|
||||||
|
state = request.values.get('state')
|
||||||
|
data = {
|
||||||
|
"state": base64.b64encode(bytes(state, 'UTF-8')),
|
||||||
|
"code": request.values['Uname'] + MY_SECRET_CODE
|
||||||
|
}
|
||||||
|
url = request.values.get('redirect_uri') + urlencode(data)
|
||||||
|
return redirect(url, code=200)
|
||||||
|
else:
|
||||||
|
return render_template('login.html',
|
||||||
|
state=request.values.get('state'),
|
||||||
|
response_type=request.values.get('response_type'),
|
||||||
|
client_id=request.values.get('client_id'),
|
||||||
|
scope=request.values.get('scope'),
|
||||||
|
redirect_uri=request.values.get('redirect_uri'),
|
||||||
|
error_message="Login failed. Please try agian.")
|
||||||
|
|
||||||
|
|
||||||
|
@openid_blueprint.route("/token", methods=["POST"])
|
||||||
|
def token():
|
||||||
|
"""Url that will return a valid token, given the super secret sauce"""
|
||||||
|
grant_type=request.values.get('grant_type')
|
||||||
|
code=request.values.get('code')
|
||||||
|
redirect_uri=request.values.get('redirect_uri')
|
||||||
|
|
||||||
|
"""We just stuffed the user name on the front of the code, so grab it."""
|
||||||
|
user_name, secret_hash = code.split(":")
|
||||||
|
|
||||||
|
"""Get authentication from headers."""
|
||||||
|
authorization = request.headers.get('Authorization')
|
||||||
|
authorization = authorization[6:] # Remove "Basic"
|
||||||
|
authorization = base64.b64decode(authorization).decode('utf-8')
|
||||||
|
client_id, client_secret = authorization.split(":")
|
||||||
|
|
||||||
|
base_url = url_for(openid_blueprint)
|
||||||
|
access_token = "..."
|
||||||
|
refresh_token = "..."
|
||||||
|
id_token = jwt.encode({
|
||||||
|
"iss": base_url,
|
||||||
|
"aud": [client_id, "account"],
|
||||||
|
"iat": time.time(),
|
||||||
|
"exp": time.time() + 86400 # Exprire after a day.
|
||||||
|
})
|
||||||
|
|
||||||
|
{'exp': 1669757386, 'iat': 1669755586, 'auth_time': 1669753049, 'jti': '0ec2cc09-3498-4921-a021-c3b98427df70',
|
||||||
|
'iss': 'http://localhost:7002/realms/spiffworkflow', 'aud': 'spiffworkflow-backend',
|
||||||
|
'sub': '99e7e4ea-d4ae-4944-bd31-873dac7b004c', 'typ': 'ID', 'azp': 'spiffworkflow-backend',
|
||||||
|
'session_state': '8751d5f6-2c60-4205-9be0-2b1005f5891e', 'at_hash': 'O5i-VLus6sryR0grMS2Y4w', 'acr': '0',
|
||||||
|
'sid': '8751d5f6-2c60-4205-9be0-2b1005f5891e', 'email_verified': False, 'preferred_username': 'dan'}
|
||||||
|
|
||||||
|
response = {
|
||||||
|
"access_token": id_token,
|
||||||
|
"id_token": id_token,
|
||||||
|
}
|
||||||
|
|
||||||
|
@openid_blueprint.route("/refresh", methods=["POST"])
|
||||||
|
def refresh():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_users():
|
||||||
|
with open(current_app.config["PERMISSIONS_FILE_FULLPATH"]) as file:
|
||||||
|
permission_configs = yaml.safe_load(file)
|
||||||
|
if "users" in permission_configs:
|
||||||
|
return permission_configs["users"]
|
||||||
|
else:
|
||||||
|
return {}
|
|
@ -0,0 +1,103 @@
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<title>Login Form</title>
|
||||||
|
<link rel="stylesheet" type="text/css" href="css/style.css">
|
||||||
|
<style>
|
||||||
|
body{
|
||||||
|
margin: 0;
|
||||||
|
padding: 0;
|
||||||
|
background-color:white;
|
||||||
|
font-family: 'Arial';
|
||||||
|
}
|
||||||
|
.error {
|
||||||
|
margin: 20px auto;
|
||||||
|
color: red;
|
||||||
|
font-weight: bold;
|
||||||
|
text-align: center;
|
||||||
|
}
|
||||||
|
.login{
|
||||||
|
width: 382px;
|
||||||
|
overflow: hidden;
|
||||||
|
margin: 20px auto;
|
||||||
|
padding: 80px;
|
||||||
|
background: #000;
|
||||||
|
border-radius: 15px ;
|
||||||
|
|
||||||
|
}
|
||||||
|
h2{
|
||||||
|
text-align: center;
|
||||||
|
color: #277582;
|
||||||
|
padding: 20px;
|
||||||
|
}
|
||||||
|
label{
|
||||||
|
color: #fff;
|
||||||
|
font-size: 17px;
|
||||||
|
}
|
||||||
|
#Uname{
|
||||||
|
width: 300px;
|
||||||
|
height: 30px;
|
||||||
|
border: none;
|
||||||
|
border-radius: 3px;
|
||||||
|
padding-left: 8px;
|
||||||
|
}
|
||||||
|
#Pass{
|
||||||
|
width: 300px;
|
||||||
|
height: 30px;
|
||||||
|
border: none;
|
||||||
|
border-radius: 3px;
|
||||||
|
padding-left: 8px;
|
||||||
|
|
||||||
|
}
|
||||||
|
#log{
|
||||||
|
width: 300px;
|
||||||
|
height: 30px;
|
||||||
|
border: none;
|
||||||
|
border-radius: 17px;
|
||||||
|
padding-left: 7px;
|
||||||
|
color: blue;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
span{
|
||||||
|
color: white;
|
||||||
|
font-size: 17px;
|
||||||
|
}
|
||||||
|
a{
|
||||||
|
float: right;
|
||||||
|
background-color: grey;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h2>Login to SpiffWorkflow</h2><br>
|
||||||
|
<div class="error">{{error_message}}</div>
|
||||||
|
<div class="login">
|
||||||
|
<form id="login" method="post" action="{{ url_for('openid.form_submit') }}">
|
||||||
|
<label><b>User Name
|
||||||
|
</b>
|
||||||
|
</label>
|
||||||
|
<input type="text" name="Uname" id="Uname" placeholder="Username">
|
||||||
|
<br><br>
|
||||||
|
<label><b>Password
|
||||||
|
</b>
|
||||||
|
</label>
|
||||||
|
<input type="Password" name="Pass" id="Pass" placeholder="Password">
|
||||||
|
<br><br>
|
||||||
|
<input type="hidden" name="state" value="{{state}}"/>
|
||||||
|
<input type="hidden" name="response_type" value="{{response_type}}"/>
|
||||||
|
<input type="hidden" name="client_id" value="{{client_id}}"/>
|
||||||
|
<input type="hidden" name="scope" value="{{scope}}"/>
|
||||||
|
<input type="hidden" name="redirect_uri" value="{{redirect_uri}}"/>
|
||||||
|
<input type="submit" name="log" id="log" value="Log In">
|
||||||
|
<br><br>
|
||||||
|
<!-- should maybe add this stuff in eventually, but this is just for testing.
|
||||||
|
<input type="checkbox" id="check">
|
||||||
|
<span>Remember me</span>
|
||||||
|
<br><br>
|
||||||
|
Forgot <a href="#">Password</a>
|
||||||
|
-->
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</body>
|
||||||
|
</html>
|
|
@ -1,6 +1,7 @@
|
||||||
"""User."""
|
"""User."""
|
||||||
import ast
|
import ast
|
||||||
import base64
|
import base64
|
||||||
|
import json
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
@ -14,9 +15,12 @@ from flask import request
|
||||||
from flask_bpmn.api.api_error import ApiError
|
from flask_bpmn.api.api_error import ApiError
|
||||||
from werkzeug.wrappers import Response
|
from werkzeug.wrappers import Response
|
||||||
|
|
||||||
|
from spiffworkflow_backend.models.group import GroupModel
|
||||||
|
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
|
||||||
|
from spiffworkflow_backend.models.principal import PrincipalModel
|
||||||
from spiffworkflow_backend.models.user import UserModel
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
from spiffworkflow_backend.services.authentication_service import (
|
from spiffworkflow_backend.services.authentication_service import (
|
||||||
AuthenticationService,
|
AuthenticationService, AuthenticationProviderTypes,
|
||||||
)
|
)
|
||||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||||
from spiffworkflow_backend.services.user_service import UserService
|
from spiffworkflow_backend.services.user_service import UserService
|
||||||
|
@ -58,7 +62,6 @@ def verify_token(
|
||||||
decoded_token = get_decoded_token(token)
|
decoded_token = get_decoded_token(token)
|
||||||
|
|
||||||
if decoded_token is not None:
|
if decoded_token is not None:
|
||||||
|
|
||||||
if "token_type" in decoded_token:
|
if "token_type" in decoded_token:
|
||||||
token_type = decoded_token["token_type"]
|
token_type = decoded_token["token_type"]
|
||||||
if token_type == "internal": # noqa: S105
|
if token_type == "internal": # noqa: S105
|
||||||
|
@ -68,11 +71,11 @@ def verify_token(
|
||||||
current_app.logger.error(
|
current_app.logger.error(
|
||||||
f"Exception in verify_token getting user from decoded internal token. {e}"
|
f"Exception in verify_token getting user from decoded internal token. {e}"
|
||||||
)
|
)
|
||||||
|
|
||||||
elif "iss" in decoded_token.keys():
|
elif "iss" in decoded_token.keys():
|
||||||
try:
|
try:
|
||||||
user_info = AuthenticationService.get_user_info_from_open_id(token)
|
if AuthenticationService.validate_id_token(token):
|
||||||
except ApiError as ae:
|
user_info = decoded_token
|
||||||
|
except ApiError as ae: # API Error is only thrown in the token is outdated.
|
||||||
# Try to refresh the token
|
# Try to refresh the token
|
||||||
user = UserService.get_user_by_service_and_service_id(
|
user = UserService.get_user_by_service_and_service_id(
|
||||||
"open_id", decoded_token["sub"]
|
"open_id", decoded_token["sub"]
|
||||||
|
@ -86,14 +89,9 @@ def verify_token(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if auth_token and "error" not in auth_token:
|
if auth_token and "error" not in auth_token:
|
||||||
# redirect to original url, with auth_token?
|
# We have the user, but this code is a bit convoluted, and will later demand
|
||||||
user_info = (
|
# a user_info object so it can look up the user. Sorry to leave this crap here.
|
||||||
AuthenticationService.get_user_info_from_open_id(
|
user_info = {"sub": user.service_id }
|
||||||
auth_token["access_token"]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if not user_info:
|
|
||||||
raise ae
|
|
||||||
else:
|
else:
|
||||||
raise ae
|
raise ae
|
||||||
else:
|
else:
|
||||||
|
@ -202,6 +200,15 @@ def login(redirect_url: str = "/") -> Response:
|
||||||
)
|
)
|
||||||
return redirect(login_redirect_url)
|
return redirect(login_redirect_url)
|
||||||
|
|
||||||
|
def parse_id_token(token: str) -> dict:
|
||||||
|
parts = token.split(".")
|
||||||
|
if len(parts) != 3:
|
||||||
|
raise Exception("Incorrect id token format")
|
||||||
|
|
||||||
|
payload = parts[1]
|
||||||
|
padded = payload + '=' * (4 - len(payload) % 4)
|
||||||
|
decoded = base64.b64decode(padded)
|
||||||
|
return json.loads(decoded)
|
||||||
|
|
||||||
def login_return(code: str, state: str, session_state: str) -> Optional[Response]:
|
def login_return(code: str, state: str, session_state: str) -> Optional[Response]:
|
||||||
"""Login_return."""
|
"""Login_return."""
|
||||||
|
@ -211,10 +218,9 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
|
||||||
if "id_token" in auth_token_object:
|
if "id_token" in auth_token_object:
|
||||||
id_token = auth_token_object["id_token"]
|
id_token = auth_token_object["id_token"]
|
||||||
|
|
||||||
|
user_info = parse_id_token(id_token)
|
||||||
|
|
||||||
if AuthenticationService.validate_id_token(id_token):
|
if AuthenticationService.validate_id_token(id_token):
|
||||||
user_info = AuthenticationService.get_user_info_from_open_id(
|
|
||||||
auth_token_object["access_token"]
|
|
||||||
)
|
|
||||||
if user_info and "error" not in user_info:
|
if user_info and "error" not in user_info:
|
||||||
user_model = AuthorizationService.create_user_from_sign_in(user_info)
|
user_model = AuthorizationService.create_user_from_sign_in(user_info)
|
||||||
g.user = user_model.id
|
g.user = user_model.id
|
||||||
|
@ -332,15 +338,11 @@ def get_user_from_decoded_internal_token(decoded_token: dict) -> Optional[UserMo
|
||||||
.filter(UserModel.service_id == service_id)
|
.filter(UserModel.service_id == service_id)
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
# user: UserModel = UserModel.query.filter()
|
|
||||||
if user:
|
if user:
|
||||||
return user
|
return user
|
||||||
user = UserModel(
|
user = UserModel(
|
||||||
username=service_id,
|
username=service_id,
|
||||||
uid=service_id,
|
|
||||||
service=service,
|
service=service,
|
||||||
service_id=service_id,
|
service_id=service_id,
|
||||||
name="API User",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
|
|
@ -42,43 +42,6 @@ class AuthenticationService:
|
||||||
open_id_client_secret_key,
|
open_id_client_secret_key,
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_user_info_from_open_id(cls, token: str) -> dict:
|
|
||||||
"""The token is an auth_token."""
|
|
||||||
(
|
|
||||||
open_id_server_url,
|
|
||||||
open_id_client_id,
|
|
||||||
open_id_realm_name,
|
|
||||||
open_id_client_secret_key,
|
|
||||||
) = cls.get_open_id_args()
|
|
||||||
|
|
||||||
headers = {"Authorization": f"Bearer {token}"}
|
|
||||||
|
|
||||||
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/userinfo"
|
|
||||||
try:
|
|
||||||
request_response = requests.get(request_url, headers=headers)
|
|
||||||
except Exception as e:
|
|
||||||
current_app.logger.error(f"Exception in get_user_info_from_id_token: {e}")
|
|
||||||
raise ApiError(
|
|
||||||
error_code="token_error",
|
|
||||||
message=f"Exception in get_user_info_from_id_token: {e}",
|
|
||||||
status_code=401,
|
|
||||||
) from e
|
|
||||||
|
|
||||||
if request_response.status_code == 401:
|
|
||||||
raise ApiError(
|
|
||||||
error_code="invalid_token", message="Please login", status_code=401
|
|
||||||
)
|
|
||||||
elif request_response.status_code == 200:
|
|
||||||
user_info: dict = json.loads(request_response.text)
|
|
||||||
return user_info
|
|
||||||
|
|
||||||
raise ApiError(
|
|
||||||
error_code="user_info_error",
|
|
||||||
message="Cannot get user info in get_user_info_from_id_token",
|
|
||||||
status_code=401,
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_backend_url() -> str:
|
def get_backend_url() -> str:
|
||||||
"""Get_backend_url."""
|
"""Get_backend_url."""
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
"""Authorization_service."""
|
"""Authorization_service."""
|
||||||
|
import inspect
|
||||||
import re
|
import re
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
@ -23,6 +24,7 @@ from spiffworkflow_backend.models.principal import PrincipalModel
|
||||||
from spiffworkflow_backend.models.user import UserModel
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
from spiffworkflow_backend.models.user import UserNotFoundError
|
from spiffworkflow_backend.models.user import UserNotFoundError
|
||||||
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
|
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
|
||||||
|
from spiffworkflow_backend.routes.openid_blueprint import openid_blueprint
|
||||||
from spiffworkflow_backend.services.group_service import GroupService
|
from spiffworkflow_backend.services.group_service import GroupService
|
||||||
from spiffworkflow_backend.services.user_service import UserService
|
from spiffworkflow_backend.services.user_service import UserService
|
||||||
|
|
||||||
|
@ -125,6 +127,7 @@ class AuthorizationService:
|
||||||
db.session.add(user_group_assignemnt)
|
db.session.add(user_group_assignemnt)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def import_permissions_from_yaml_file(
|
def import_permissions_from_yaml_file(
|
||||||
cls, raise_if_missing_user: bool = False
|
cls, raise_if_missing_user: bool = False
|
||||||
|
@ -241,6 +244,7 @@ class AuthorizationService:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
api_view_function = current_app.view_functions[request.endpoint]
|
api_view_function = current_app.view_functions[request.endpoint]
|
||||||
|
module = inspect.getmodule(api_view_function)
|
||||||
if (
|
if (
|
||||||
api_view_function
|
api_view_function
|
||||||
and api_view_function.__name__.startswith("login")
|
and api_view_function.__name__.startswith("login")
|
||||||
|
@ -248,6 +252,7 @@ class AuthorizationService:
|
||||||
or api_view_function.__name__.startswith("console_ui_")
|
or api_view_function.__name__.startswith("console_ui_")
|
||||||
or api_view_function.__name__ in authentication_exclusion_list
|
or api_view_function.__name__ in authentication_exclusion_list
|
||||||
or api_view_function.__name__ in swagger_functions
|
or api_view_function.__name__ in swagger_functions
|
||||||
|
or module == openid_blueprint
|
||||||
):
|
):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -442,6 +447,7 @@ class AuthorizationService:
|
||||||
email=email,
|
email=email,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# this may eventually get too slow.
|
# this may eventually get too slow.
|
||||||
# when it does, be careful about backgrounding, because
|
# when it does, be careful about backgrounding, because
|
||||||
# the user will immediately need permissions to use the site.
|
# the user will immediately need permissions to use the site.
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
"""Test_authentication."""
|
||||||
|
import ast
|
||||||
|
import base64
|
||||||
|
|
||||||
|
from flask import Flask
|
||||||
|
from flask.testing import FlaskClient
|
||||||
|
|
||||||
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||||
|
|
||||||
|
from spiffworkflow_backend.services.authentication_service import (
|
||||||
|
AuthenticationService,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestFaskOpenId(BaseTest):
|
||||||
|
"""An integrated Open ID that responds to openID requests
|
||||||
|
by referencing a build in YAML file. Useful for
|
||||||
|
local development, testing, demos etc..."""
|
||||||
|
|
||||||
|
def test_discovery_of_endpoints(self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,) -> None:
|
||||||
|
response = client.get("/openid/well-known/openid-configuration")
|
||||||
|
discovered_urls = response.json
|
||||||
|
assert "http://localhost/openid" == discovered_urls["issuer"]
|
||||||
|
assert "http://localhost/openid/auth" == discovered_urls["authorization_endpoint"]
|
||||||
|
assert "http://localhost/openid/token" == discovered_urls["token_endpoint"]
|
||||||
|
|
||||||
|
def test_get_login_page(self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,) -> None:
|
||||||
|
# It should be possible to get to a login page
|
||||||
|
data = {
|
||||||
|
"state": {"bubblegum":1, "daydream":2}
|
||||||
|
}
|
||||||
|
response = client.get("/openid/auth", query_string=data)
|
||||||
|
assert b"<h2>Login to SpiffWorkflow</h2>" in response.data
|
||||||
|
assert b"bubblegum" in response.data
|
||||||
|
|
||||||
|
def test_get_token(self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,) -> None:
|
||||||
|
|
||||||
|
code = "c3BpZmZ3b3JrZmxvdy1iYWNrZW5kOkpYZVFFeG0wSmhRUEx1bWdIdElJcWY1MmJEYWxIejBx"
|
||||||
|
headers = {
|
||||||
|
"Content-Type": "application/x-www-form-urlencoded",
|
||||||
|
"Authorization": f"Basic {code}",
|
||||||
|
}
|
||||||
|
data = {
|
||||||
|
"grant_type": 'authorization_code',
|
||||||
|
"code": code,
|
||||||
|
"redirect_url": 'http://localhost:7000/v1.0/login_return'
|
||||||
|
}
|
||||||
|
response = client.post("/openid/token", data=data, headers=headers)
|
||||||
|
assert response
|
||||||
|
|
||||||
|
def test_refresh_token_endpoint(self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,) -> None:
|
||||||
|
pass
|
||||||
|
# Handle a refresh with the following
|
||||||
|
# data provided
|
||||||
|
# "grant_type": "refresh_token",
|
||||||
|
# "refresh_token": refresh_token,
|
||||||
|
# "client_id": open_id_client_id,
|
||||||
|
# "client_secret": open_id_client_secret_key,
|
||||||
|
# Return an json response with:
|
||||||
|
# id - (this users' id)
|
||||||
|
|
||||||
|
def test_logout(self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,) -> None:
|
||||||
|
pass
|
||||||
|
# It should be possible to logout and be redirected back.
|
Loading…
Reference in New Issue