Login to the Swagger api

This commit is contained in:
mike cullerton 2022-10-10 14:30:56 -04:00
parent b0cd3539de
commit 7865bc9c64
3 changed files with 107 additions and 84 deletions

View File

@ -85,47 +85,41 @@ paths:
"200":
description: Logout Authenticated User
# /login_api:
# parameters:
# - name: redirect_url
# in: query
# required: false
# schema:
# type: string
# get:
# security: []
# operationId: spiffworkflow_backend.routes.user.login_api
# summary: Authenticate user for API access
# tags:
# - Authentication
# responses:
# "304":
# description: Redirection to the hosted frontend with an auth_token header.
# /login_api_return:
# parameters:
# - name: code
# in: query
# required: true
# schema:
# type: string
# - name: state
# in: query
# required: true
# schema:
# type: string
# - name: session_state
# in: query
# required: false
# schema:
# type: string
# get:
# security: []
# operationId: spiffworkflow_backend.routes.user.login_api_return
# tags:
# - Authentication
# responses:
# "200":
# description: Test Return Response
/login_api:
get:
security: []
operationId: spiffworkflow_backend.routes.user.login_api
summary: Authenticate user for API access
tags:
- Authentication
responses:
"200":
description: Redirects to authentication server
/login_api_return:
parameters:
- name: code
in: query
required: true
schema:
type: string
- name: state
in: query
required: true
schema:
type: string
- name: session_state
in: query
required: false
schema:
type: string
get:
security: []
operationId: spiffworkflow_backend.routes.user.login_api_return
tags:
- Authentication
responses:
"200":
description: Test Return Response
/status:
get:
@ -1250,7 +1244,7 @@ components:
flows:
authorizationCode:
authorizationUrl: /v1.0/login_api
tokenUrl: /v1.0/login_return
tokenUrl: /v1.0/login_api_return
scopes:
read_email: read email
x-tokenInfoFunc: spiffworkflow_backend.routes.user.get_scope

View File

@ -10,6 +10,7 @@ import jwt
from flask import current_app
from flask import g
from flask import redirect
from flask import request
from flask_bpmn.api.api_error import ApiError
from werkzeug.wrappers.response import Response
@ -214,49 +215,77 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
state_redirect_url = state_dict["redirect_url"]
id_token_object = PublicAuthenticationService().get_id_token_object(code)
id_token = id_token_object["id_token"]
if "id_token" in id_token_object:
id_token = id_token_object["id_token"]
if PublicAuthenticationService.validate_id_token(id_token):
user_info = PublicAuthenticationService.get_user_info_from_id_token(
id_token_object["access_token"]
)
if user_info and "error" not in user_info:
user_model = (
UserModel.query.filter(UserModel.service == "open_id")
.filter(UserModel.service_id == user_info["sub"])
.first()
if PublicAuthenticationService.validate_id_token(id_token):
user_info = PublicAuthenticationService.get_user_info_from_id_token(
id_token_object["access_token"]
)
if user_model is None:
current_app.logger.debug("create_user in login_return")
name = username = email = ""
if "name" in user_info:
name = user_info["name"]
if "username" in user_info:
username = user_info["username"]
elif "preferred_username" in user_info:
username = user_info["preferred_username"]
if "email" in user_info:
email = user_info["email"]
user_model = UserService().create_user(
service="open_id",
service_id=user_info["sub"],
name=name,
username=username,
email=email,
if user_info and "error" not in user_info:
user_model = (
UserModel.query.filter(UserModel.service == "open_id")
.filter(UserModel.service_id == user_info["sub"])
.first()
)
if user_model:
g.user = user_model.id
if user_model is None:
current_app.logger.debug("create_user in login_return")
name = username = email = ""
if "name" in user_info:
name = user_info["name"]
if "username" in user_info:
username = user_info["username"]
elif "preferred_username" in user_info:
username = user_info["preferred_username"]
if "email" in user_info:
email = user_info["email"]
user_model = UserService().create_user(
service="open_id",
service_id=user_info["sub"],
name=name,
username=username,
email=email,
)
redirect_url = (
f"{state_redirect_url}?"
+ f"access_token={id_token_object['access_token']}&"
+ f"id_token={id_token}"
)
return redirect(redirect_url)
raise ApiError(
code="invalid_login", message="Login failed. Please try again", status_code=401
)
if user_model:
g.user = user_model.id
redirect_url = (
f"{state_redirect_url}?"
+ f"access_token={id_token_object['access_token']}&"
+ f"id_token={id_token}"
)
return redirect(redirect_url)
raise ApiError(
code="invalid_login", message="Login failed. Please try again", status_code=401
)
else:
raise ApiError(
code="invalid_token", message="Login failed. Please try again", status_code=401
)
def login_api():
if "SWAGGER_URL" in current_app.config:
redirect_url = "/v1.0/login_api_return"
state = PublicAuthenticationService.generate_state(redirect_url)
login_redirect_url = PublicAuthenticationService().get_login_redirect_url(
state.decode("UTF-8"),
redirect_url
)
return redirect(login_redirect_url)
def login_api_return(code: str, state: str, session_state: str):
state_dict = ast.literal_eval(base64.b64decode(state).decode("utf-8"))
state_redirect_url = state_dict["redirect_url"]
id_token_object = PublicAuthenticationService().get_id_token_object(code, "/v1.0/login_api_return")
return id_token_object['access_token']
print("login_api_return")
def logout(id_token: str, redirect_url: Optional[str]) -> Response:

View File

@ -114,7 +114,7 @@ class PublicAuthenticationService:
state = base64.b64encode(bytes(str({"redirect_url": redirect_url}), "UTF-8"))
return state
def get_login_redirect_url(self, state: str) -> str:
def get_login_redirect_url(self, state: str, redirect_url: str = "/v1.0/login_return") -> str:
"""Get_login_redirect_url."""
(
open_id_server_url,
@ -122,7 +122,7 @@ class PublicAuthenticationService:
open_id_realm_name,
open_id_client_secret_key,
) = PublicAuthenticationService.get_open_id_args()
return_redirect_url = f"{self.get_backend_url()}/v1.0/login_return"
return_redirect_url = f"{self.get_backend_url()}{redirect_url}"
login_redirect_url = (
f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/auth?"
+ f"state={state}&"
@ -133,7 +133,7 @@ class PublicAuthenticationService:
)
return login_redirect_url
def get_id_token_object(self, code: str) -> dict:
def get_id_token_object(self, code: str, redirect_url: str = "/v1.0/login_return") -> dict:
"""Get_id_token_object."""
(
open_id_server_url,
@ -152,7 +152,7 @@ class PublicAuthenticationService:
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": f"{self.get_backend_url()}/v1.0/login_return",
"redirect_uri": f"{self.get_backend_url()}{redirect_url}",
}
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"