Merge commit '4a48d9cccd1ca8619b3dbef3c10bcce667c9d9e0'

This commit is contained in:
burnettk 2022-10-20 16:00:12 -04:00
commit f7557c48e9
28 changed files with 1095 additions and 455 deletions

View File

@ -0,0 +1,14 @@
"""Deletes all permissions and then re-imports from yaml file."""
from spiffworkflow_backend import get_hacked_up_app_for_script
from spiffworkflow_backend.services.authorization_service import AuthorizationService
def main() -> None:
"""Main."""
app = get_hacked_up_app_for_script()
with app.app_context():
AuthorizationService.delete_all_permissions_and_recreate()
if __name__ == "__main__":
main()

View File

@ -630,6 +630,28 @@
"notBefore": 0,
"groups": []
},
{
"id": "9b46f3be-a81d-4b76-92e6-2ac8462f5ec8",
"createdTimestamp": 1665688255982,
"username": "finance_user1",
"enabled": true,
"totp": false,
"emailVerified": false,
"credentials": [
{
"id": "f14722ec-13a7-4d35-a4ec-0475d405ae58",
"type": "password",
"createdDate": 1665688275943,
"secretData": "{\"value\":\"PlNhf8ShIvaSP3CUwCwAJ2tkqcTCVmCWUy4rbuLSXxEIiuGMu4XeZdsrE82R8PWuDQhlWn/YOUOk38xKZS2ySQ==\",\"salt\":\"m7JGY2cWgFBXMYQSSP2JQQ==\",\"additionalParameters\":{}}",
"credentialData": "{\"hashIterations\":27500,\"algorithm\":\"pbkdf2-sha256\",\"additionalParameters\":{}}"
}
],
"disableableCredentialTypes": [],
"requiredActions": [],
"realmRoles": ["default-roles-spiffworkflow"],
"notBefore": 0,
"groups": []
},
{
"id": "087bdc16-e362-4340-aa60-1ff71a45f844",
"createdTimestamp": 1665516884829,
@ -828,31 +850,6 @@
"notBefore": 0,
"groups": []
},
{
"id": "a15da457-7ebb-49d4-9dcc-6876cb71600d",
"createdTimestamp": 1657115919770,
"username": "repeat_form_user_1",
"enabled": true,
"totp": false,
"emailVerified": false,
"credentials": [
{
"id": "509dfd8d-a54e-4d8b-b250-ec99e585e15d",
"type": "password",
"createdDate": 1657298008525,
"secretData": "{\"value\":\"/47zG9XBvKg+1P2z6fRL4cyUNn+sB4BgXsxBsvi1NYR9Z20WTeWzzOT2uXvv2ajKMRHrv0OqTesldvSJXARPqA==\",\"salt\":\"dODEHOF24xGPx+7QGaIXWQ==\",\"additionalParameters\":{}}",
"credentialData": "{\"hashIterations\":27500,\"algorithm\":\"pbkdf2-sha256\",\"additionalParameters\":{}}"
}
],
"disableableCredentialTypes": [],
"requiredActions": [],
"realmRoles": ["default-roles-spiffworkflow"],
"clientRoles": {
"spiffworkflow-backend": ["uma_protection", "repeat-form-role-2"]
},
"notBefore": 0,
"groups": []
},
{
"id": "f3852a7d-8adf-494f-b39d-96ad4c899ee5",
"createdTimestamp": 1665516926300,

View File

@ -10,6 +10,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -57,6 +58,7 @@ def with_db_and_bpmn_file_cleanup() -> None:
"""Process_group_resource."""
for model in SpiffworkflowBaseDBModel._all_subclasses():
db.session.query(model).delete()
db.session.commit()
try:
yield
@ -66,6 +68,12 @@ def with_db_and_bpmn_file_cleanup() -> None:
shutil.rmtree(process_model_service.root_path())
@pytest.fixture()
def with_super_admin_user() -> UserModel:
"""With_super_admin_user."""
return BaseTest.create_user_with_permission("super_admin")
@pytest.fixture()
def setup_process_instances_for_reports() -> list[ProcessInstanceModel]:
"""Setup_process_instances_for_reports."""

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 9e14b40371f3
Revision ID: e6b28d8e3178
Revises:
Create Date: 2022-10-19 19:31:20.431800
Create Date: 2022-10-20 13:05:25.896486
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9e14b40371f3'
revision = 'e6b28d8e3178'
down_revision = None
branch_labels = None
depends_on = None
@ -134,11 +134,21 @@ def upgrade():
op.create_index(op.f('ix_process_instance_report_identifier'), 'process_instance_report', ['identifier'], unique=False)
op.create_index(op.f('ix_process_instance_report_process_group_identifier'), 'process_instance_report', ['process_group_identifier'], unique=False)
op.create_index(op.f('ix_process_instance_report_process_model_identifier'), 'process_instance_report', ['process_model_identifier'], unique=False)
op.create_table('refresh_token',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(length=1024), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('user_id')
)
op.create_table('secret',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('key', sa.String(length=50), nullable=False),
sa.Column('value', sa.Text(), nullable=False),
sa.Column('creator_user_id', sa.Integer(), nullable=False),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['creator_user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('key')
@ -226,8 +236,8 @@ def upgrade():
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('principal_id', sa.Integer(), nullable=False),
sa.Column('permission_target_id', sa.Integer(), nullable=False),
sa.Column('grant_type', sa.String(length=50), nullable=True),
sa.Column('permission', sa.String(length=50), nullable=True),
sa.Column('grant_type', sa.String(length=50), nullable=False),
sa.Column('permission', sa.String(length=50), nullable=False),
sa.ForeignKeyConstraint(['permission_target_id'], ['permission_target.id'], ),
sa.ForeignKeyConstraint(['principal_id'], ['principal.id'], ),
sa.PrimaryKeyConstraint('id'),
@ -316,6 +326,7 @@ def downgrade():
op.drop_table('active_task')
op.drop_table('user_group_assignment')
op.drop_table('secret')
op.drop_table('refresh_token')
op.drop_index(op.f('ix_process_instance_report_process_model_identifier'), table_name='process_instance_report')
op.drop_index(op.f('ix_process_instance_report_process_group_identifier'), table_name='process_instance_report')
op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report')

View File

@ -95,7 +95,7 @@ python-versions = ">=3.5"
dev = ["cloudpickle", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy (>=0.900,!=0.940)", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "sphinx", "sphinx-notfound-page", "zope.interface"]
docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"]
tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "zope.interface"]
tests_no_zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins"]
tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins"]
[[package]]
name = "Babel"
@ -268,7 +268,7 @@ optional = false
python-versions = ">=3.6.0"
[package.extras]
unicode_backport = ["unicodedata2"]
unicode-backport = ["unicodedata2"]
[[package]]
name = "classify-imports"
@ -639,7 +639,7 @@ werkzeug = "*"
type = "git"
url = "https://github.com/sartography/flask-bpmn"
reference = "main"
resolved_reference = "bd4b45a842ed63a29e74ff02ea7f2a56d7b2298a"
resolved_reference = "c8fd01df47518749a074772fec383256c482139f"
[[package]]
name = "Flask-Cors"
@ -1512,7 +1512,7 @@ urllib3 = ">=1.21.1,<1.27"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use_chardet_on_py3 = ["chardet (>=3.0.2,<6)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "requests-toolbelt"
@ -1625,7 +1625,7 @@ falcon = ["falcon (>=1.4)"]
fastapi = ["fastapi (>=0.79.0)"]
flask = ["blinker (>=1.1)", "flask (>=0.11)"]
httpx = ["httpx (>=0.16.0)"]
pure_eval = ["asttokens", "executing", "pure-eval"]
pure-eval = ["asttokens", "executing", "pure-eval"]
pyspark = ["pyspark (>=2.4.4)"]
quart = ["blinker (>=1.1)", "quart (>=0.16.1)"]
rq = ["rq (>=0.6)"]
@ -1847,7 +1847,7 @@ test = ["pytest"]
[[package]]
name = "SpiffWorkflow"
version = "1.2.0"
version = "1.2.1"
description = "A workflow framework and BPMN/DMN Processor"
category = "main"
optional = false
@ -1858,7 +1858,6 @@ develop = false
celery = "*"
configparser = "*"
dateparser = "*"
importlib-metadata = "<5.0"
lxml = "*"
pytz = "*"
@ -1884,19 +1883,19 @@ aiomysql = ["aiomysql", "greenlet (!=0.4.17)"]
aiosqlite = ["aiosqlite", "greenlet (!=0.4.17)", "typing_extensions (!=3.10.0.1)"]
asyncio = ["greenlet (!=0.4.17)"]
asyncmy = ["asyncmy (>=0.2.3,!=0.2.4)", "greenlet (!=0.4.17)"]
mariadb_connector = ["mariadb (>=1.0.1,!=1.1.2)"]
mariadb-connector = ["mariadb (>=1.0.1,!=1.1.2)"]
mssql = ["pyodbc"]
mssql_pymssql = ["pymssql"]
mssql_pyodbc = ["pyodbc"]
mssql-pymssql = ["pymssql"]
mssql-pyodbc = ["pyodbc"]
mypy = ["mypy (>=0.910)", "sqlalchemy2-stubs"]
mysql = ["mysqlclient (>=1.4.0)", "mysqlclient (>=1.4.0,<2)"]
mysql_connector = ["mysql-connector-python"]
mysql-connector = ["mysql-connector-python"]
oracle = ["cx_oracle (>=7)", "cx_oracle (>=7,<8)"]
postgresql = ["psycopg2 (>=2.7)"]
postgresql_asyncpg = ["asyncpg", "greenlet (!=0.4.17)"]
postgresql_pg8000 = ["pg8000 (>=1.16.6,!=1.29.0)"]
postgresql_psycopg2binary = ["psycopg2-binary"]
postgresql_psycopg2cffi = ["psycopg2cffi"]
postgresql-asyncpg = ["asyncpg", "greenlet (!=0.4.17)"]
postgresql-pg8000 = ["pg8000 (>=1.16.6,!=1.29.0)"]
postgresql-psycopg2binary = ["psycopg2-binary"]
postgresql-psycopg2cffi = ["psycopg2cffi"]
pymysql = ["pymysql", "pymysql (<1)"]
sqlcipher = ["sqlcipher3_binary"]
@ -2030,7 +2029,7 @@ python-versions = "*"
name = "types-PyYAML"
version = "6.0.12"
description = "Typing stubs for PyYAML"
category = "main"
category = "dev"
optional = false
python-versions = "*"
@ -2234,7 +2233,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "1.1"
python-versions = ">=3.9,<3.11"
content-hash = "2602fd47f14d1163b2590ab01d3adb1ce881c699bb09630e6fdfc56b919a7a4e"
content-hash = "cff4bcfd10157833f1a0f0bb806c3543267c3e99cc13f311b328d101c30ac553"
[metadata.files]
alabaster = [
@ -3013,18 +3012,7 @@ py = [
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
]
pyasn1 = [
{file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"},
{file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"},
{file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"},
{file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"},
{file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"},
{file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"},
{file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"},
{file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"},
{file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"},
{file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"},
{file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"},
{file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"},
{file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"},
]
pycodestyle = [

View File

@ -19,7 +19,9 @@ import spiffworkflow_backend.load_database_models # noqa: F401
from spiffworkflow_backend.config import setup_config
from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint
from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint
from spiffworkflow_backend.routes.user import verify_token
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.background_processing_service import (
BackgroundProcessingService,
)
@ -114,6 +116,9 @@ def create_app() -> flask.app.Flask:
configure_sentry(app)
app.before_request(verify_token)
app.before_request(AuthorizationService.check_for_permission)
return app # type: ignore

View File

@ -1,13 +1,14 @@
openapi: "3.0.2"
info:
version: 1.0.0
title: Workflow Microservice
title: spiffworkflow-backend
license:
name: MIT
servers:
- url: http://localhost:5000/v1.0
security:
- jwt: ["secret"]
# this is handled in flask now
security: []
# - jwt: ["secret"]
# - oAuth2AuthCode:
# - read_email
# - uid
@ -378,7 +379,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/OkTrue"
# process model update
put:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_update
summary: Modifies an existing process mosel with the given parameters.
@ -827,7 +827,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/File"
# process_model_file_update
put:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_file_update
summary: save the contents to the given file
@ -1250,6 +1249,25 @@ paths:
"404":
description: Secret does not exist
/permissions-check:
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.permissions_check
summary: Checks if current user has access to given list of target uris and permissions.
tags:
- Permissions
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/Secret"
responses:
"200":
description: Result of permission check
content:
application/json:
schema:
$ref: "#/components/schemas/Secret"
components:
securitySchemes:
jwt:

View File

@ -1,13 +1,25 @@
groups:
admin:
users:
[jakub, kb, alex, dan, mike, jason, amir, jarrad, elizabeth, jon, natalia]
[
jakub,
kb,
alex,
dan,
mike,
jason,
amir,
jarrad,
elizabeth,
jon,
harmeet,
sasha,
manuchehr,
natalia,
]
finance:
users: [harmeet, sasha]
hr:
users: [manuchehr]
users: [finance_user1]
permissions:
admin:
@ -20,10 +32,10 @@ permissions:
groups: [finance]
users: []
allowed_permissions: [create, read, update, delete]
uri: /v1.0/process-groups/finance/*
uri: /v1.0/process-groups/execute-procure-to-pay/*
read-all:
groups: [finance, hr, admin]
groups: [finance, admin]
users: []
allowed_permissions: [read]
uri: /*

View File

@ -11,3 +11,7 @@ SPIFFWORKFLOW_BACKEND_LOG_TO_FILE = (
SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME = environ.get(
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME", default="testing.yml"
)
SPIFFWORKFLOW_BACKEND_LOG_LEVEL = environ.get(
"SPIFFWORKFLOW_BACKEND_LOG_LEVEL", default="debug"
)

View File

@ -45,6 +45,7 @@ from spiffworkflow_backend.models.process_instance import (
from spiffworkflow_backend.models.process_instance_report import (
ProcessInstanceReportModel,
) # noqa: F401
from spiffworkflow_backend.models.refresh_token import RefreshTokenModel # noqa: F401
from spiffworkflow_backend.models.secret_model import SecretModel # noqa: F401
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel # noqa: F401
from spiffworkflow_backend.models.task_event import TaskEventModel # noqa: F401

View File

@ -29,4 +29,4 @@ class GroupModel(FlaskBpmnGroupModel):
secondary="user_group_assignment",
overlaps="user_group_assignments,users",
)
principal = relationship("PrincipalModel", uselist=False) # type: ignore
principal = relationship("PrincipalModel", uselist=False, cascade="all, delete") # type: ignore

View File

@ -31,7 +31,13 @@ class Permission(enum.Enum):
read = "read"
update = "update"
delete = "delete"
# maybe read to GET process_model/process-instances instead?
list = "list"
# maybe use create instead on
# POST http://localhost:7000/v1.0/process-models/category_number_one/call-activity/process-instances/*
# POST http://localhost:7000/v1.0/process-models/category_number_one/call-activity/process-instances/332/run
instantiate = "instantiate" # this is something you do to a process model
@ -50,10 +56,10 @@ class PermissionAssignmentModel(SpiffworkflowBaseDBModel):
id = db.Column(db.Integer, primary_key=True)
principal_id = db.Column(ForeignKey(PrincipalModel.id), nullable=False)
permission_target_id = db.Column(
ForeignKey(PermissionTargetModel.id), nullable=False
ForeignKey(PermissionTargetModel.id), nullable=False # type: ignore
)
grant_type = db.Column(db.String(50))
permission = db.Column(db.String(50))
grant_type = db.Column(db.String(50), nullable=False)
permission = db.Column(db.String(50), nullable=False)
@validates("grant_type")
def validate_grant_type(self, key: str, value: str) -> Any:

View File

@ -1,5 +1,7 @@
"""PermissionTarget."""
import re
from dataclasses import dataclass
from typing import Optional
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
@ -10,13 +12,23 @@ class InvalidPermissionTargetUriError(Exception):
"""InvalidPermissionTargetUriError."""
@dataclass
class PermissionTargetModel(SpiffworkflowBaseDBModel):
"""PermissionTargetModel."""
URI_ALL = "/%"
__tablename__ = "permission_target"
id = db.Column(db.Integer, primary_key=True)
uri = db.Column(db.String(255), unique=True, nullable=False)
id: int = db.Column(db.Integer, primary_key=True)
uri: str = db.Column(db.String(255), unique=True, nullable=False)
def __init__(self, uri: str, id: Optional[int] = None):
"""__init__."""
if id:
self.id = id
uri_with_percent = re.sub(r"\*", "%", uri)
self.uri = uri_with_percent
@validates("uri")
def validate_uri(self, key: str, value: str) -> str:

View File

@ -0,0 +1,22 @@
"""Refresh_token."""
from dataclasses import dataclass
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
# from sqlalchemy.orm import relationship
# from spiffworkflow_backend.models.user import UserModel
@dataclass()
class RefreshTokenModel(SpiffworkflowBaseDBModel):
"""RefreshTokenModel."""
__tablename__ = "refresh_token"
id: int = db.Column(db.Integer, primary_key=True)
user_id: int = db.Column(ForeignKey("user.id"), nullable=False, unique=True)
token: str = db.Column(db.String(1024), nullable=False)
# user = relationship("UserModel", back_populates="refresh_token")

View File

@ -18,6 +18,8 @@ class SecretModel(SpiffworkflowBaseDBModel):
key: str = db.Column(db.String(50), unique=True, nullable=False)
value: str = db.Column(db.Text(), nullable=False)
creator_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
class SecretModelSchema(Schema):

View File

@ -55,6 +55,7 @@ from spiffworkflow_backend.models.secret_model import SecretModelSchema
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.user import verify_token
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.git_service import GitService
@ -97,6 +98,39 @@ def status() -> flask.wrappers.Response:
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def permissions_check(body: Dict[str, Dict[str, list[str]]]) -> flask.wrappers.Response:
"""Permissions_check."""
if "requests_to_check" not in body:
raise (
ApiError(
error_code="could_not_requests_to_check",
message="The key 'requests_to_check' not found at root of request body.",
status_code=400,
)
)
response_dict: dict[str, dict[str, bool]] = {}
requests_to_check = body["requests_to_check"]
for target_uri, http_methods in requests_to_check.items():
if target_uri not in response_dict:
response_dict[target_uri] = {}
for http_method in http_methods:
permission_string = AuthorizationService.get_permission_from_http_method(
http_method
)
if permission_string:
has_permission = AuthorizationService.user_has_permission(
user=g.user,
permission=permission_string,
target_uri=target_uri,
)
response_dict[target_uri][http_method] = has_permission
return make_response(jsonify({"results": response_dict}), 200)
def process_group_add(
body: Dict[str, Union[str, bool, int]]
) -> flask.wrappers.Response:
@ -794,9 +828,8 @@ def authentication_callback(
auth_method: str,
) -> werkzeug.wrappers.Response:
"""Authentication_callback."""
verify_token(request.args.get("token"))
verify_token(request.args.get("token"), force_run=True)
response = request.args["response"]
print(f"response: {response}")
SecretService().update_secret(
f"{service}/{auth_method}", response, g.user.id, create_if_not_exists=True
)
@ -848,6 +881,8 @@ def process_instance_report_show(
return Response(json.dumps(result_dict), status=200, mimetype="application/json")
# TODO: see comment for before_request
# @process_api_blueprint.route("/v1.0/tasks", methods=["GET"])
def task_list_my_tasks(page: int = 1, per_page: int = 100) -> flask.wrappers.Response:
"""Task_list_my_tasks."""
principal = find_principal_or_raise()

View File

@ -10,12 +10,13 @@ import jwt
from flask import current_app
from flask import g
from flask import redirect
from flask import request
from flask_bpmn.api.api_error import ApiError
from werkzeug.wrappers import Response
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authentication_service import (
PublicAuthenticationService,
AuthenticationService,
)
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.user_service import UserService
@ -26,13 +27,17 @@ from spiffworkflow_backend.services.user_service import UserService
"""
def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, int]]]:
# authorization_exclusion_list = ['status']
def verify_token(
token: Optional[str] = None, force_run: Optional[bool] = False
) -> Optional[Dict[str, Optional[Union[str, int]]]]:
"""Verify the token for the user (if provided).
If in production environment and token is not provided, gets user from the SSO headers and returns their token.
Args:
token: Optional[str]
force_run: Optional[bool]
Returns:
token: str
@ -41,6 +46,12 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
ApiError: If not on production and token is not valid, returns an 'invalid_token' 403 error.
If on production and user is not authenticated, returns a 'no_user' 403 error.
"""
if not force_run and AuthorizationService.should_disable_auth_for_request():
return None
if not token and "Authorization" in request.headers:
token = request.headers["Authorization"].removeprefix("Bearer ")
if token:
user_model = None
decoded_token = get_decoded_token(token)
@ -59,10 +70,34 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
elif "iss" in decoded_token.keys():
try:
user_info = PublicAuthenticationService.get_user_info_from_id_token(
token
)
user_info = AuthenticationService.get_user_info_from_open_id(token)
except ApiError as ae:
# Try to refresh the token
user = UserService.get_user_by_service_and_service_id(
"open_id", decoded_token["sub"]
)
if user:
refresh_token = AuthenticationService.get_refresh_token(user.id)
if refresh_token:
auth_token: dict = (
AuthenticationService.get_auth_token_from_refresh_token(
refresh_token
)
)
if auth_token and "error" not in auth_token:
# redirect to original url, with auth_token?
user_info = (
AuthenticationService.get_user_info_from_open_id(
auth_token["access_token"]
)
)
if not user_info:
raise ae
else:
raise ae
else:
raise ae
else:
raise ae
except Exception as e:
current_app.logger.error(f"Exception raised in get_token: {e}")
@ -106,9 +141,11 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
# If the user is valid, store the token for this session
if g.user:
# This is an id token, so we don't have a refresh token yet
g.token = token
scope = get_scope(token)
return {"uid": g.user.id, "sub": g.user.id, "scope": scope}
get_scope(token)
return None
# return {"uid": g.user.id, "sub": g.user.id, "scope": scope}
# return validate_scope(token, user_info, user_model)
else:
raise ApiError(error_code="no_user_id", message="Cannot get a user id")
@ -116,67 +153,20 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
raise ApiError(
error_code="invalid_token", message="Cannot validate token.", status_code=401
)
# no token -- do we ever get here?
# else:
# ...
# if current_app.config.get("DEVELOPMENT"):
# # Fall back to a default user if this is not production.
# g.user = UserModel.query.first()
# if not g.user:
# raise ApiError(
# "no_user",
# "You are in development mode, but there are no users in the database. Add one, and it will use it.",
# )
# token_from_user = g.user.encode_auth_token()
# token_info = UserModel.decode_auth_token(token_from_user)
# return token_info
#
# else:
# raise ApiError(
# error_code="no_auth_token",
# message="No authorization token was available.",
# status_code=401,
# )
def validate_scope(token: Any) -> bool:
"""Validate_scope."""
print("validate_scope")
# token = PublicAuthenticationService.refresh_token(token)
# user_info = PublicAuthenticationService.get_user_info_from_public_access_token(token)
# bearer_token = PublicAuthenticationService.get_bearer_token(token)
# permission = PublicAuthenticationService.get_permission_by_basic_token(token)
# permissions = PublicAuthenticationService.get_permissions_by_token_for_resource_and_scope(token)
# introspection = PublicAuthenticationService.introspect_token(basic_token)
# token = AuthenticationService.refresh_token(token)
# user_info = AuthenticationService.get_user_info_from_public_access_token(token)
# bearer_token = AuthenticationService.get_bearer_token(token)
# permission = AuthenticationService.get_permission_by_basic_token(token)
# permissions = AuthenticationService.get_permissions_by_token_for_resource_and_scope(token)
# introspection = AuthenticationService.introspect_token(basic_token)
return True
# def login_api(redirect_url: str = "/v1.0/ui") -> Response:
# """Api_login."""
# # TODO: Fix this! mac 20220801
# # token:dict = PublicAuthenticationService().get_public_access_token(uid, password)
# #
# # return token
# # if uid:
# # sub = f"service:internal::service_id:{uid}"
# # token = encode_auth_token(sub)
# # user_model = UserModel(username=uid,
# # uid=uid,
# # service='internal',
# # name="API User")
# # g.user = user_model
# #
# # g.token = token
# # scope = get_scope(token)
# # return token
# # return {"uid": uid, "sub": uid, "scope": scope}
# return login(redirect_url)
# def login_api_return(code: str, state: str, session_state: str) -> Optional[Response]:
# print("login_api_return")
def encode_auth_token(sub: str, token_type: Optional[str] = None) -> str:
"""Generates the Auth Token.
@ -202,8 +192,8 @@ def encode_auth_token(sub: str, token_type: Optional[str] = None) -> str:
def login(redirect_url: str = "/") -> Response:
"""Login."""
state = PublicAuthenticationService.generate_state(redirect_url)
login_redirect_url = PublicAuthenticationService().get_login_redirect_url(
state = AuthenticationService.generate_state(redirect_url)
login_redirect_url = AuthenticationService().get_login_redirect_url(
state.decode("UTF-8")
)
return redirect(login_redirect_url)
@ -214,13 +204,13 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
state_dict = ast.literal_eval(base64.b64decode(state).decode("utf-8"))
state_redirect_url = state_dict["redirect_url"]
id_token_object = PublicAuthenticationService().get_id_token_object(code)
if "id_token" in id_token_object:
id_token = id_token_object["id_token"]
auth_token_object = AuthenticationService().get_auth_token_object(code)
if "id_token" in auth_token_object:
id_token = auth_token_object["id_token"]
if PublicAuthenticationService.validate_id_token(id_token):
user_info = PublicAuthenticationService.get_user_info_from_id_token(
id_token_object["access_token"]
if AuthenticationService.validate_id_token(id_token):
user_info = AuthenticationService.get_user_info_from_open_id(
auth_token_object["access_token"]
)
if user_info and "error" not in user_info:
user_model = (
@ -250,6 +240,10 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
if user_model:
g.user = user_model.id
g.token = auth_token_object["id_token"]
AuthenticationService.store_refresh_token(
user_model.id, auth_token_object["refresh_token"]
)
# this may eventually get too slow.
# when it does, be careful about backgrounding, because
@ -261,7 +255,7 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
redirect_url = (
f"{state_redirect_url}?"
+ f"access_token={id_token_object['access_token']}&"
+ f"access_token={auth_token_object['access_token']}&"
+ f"id_token={id_token}"
)
return redirect(redirect_url)
@ -283,8 +277,8 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
def login_api() -> Response:
"""Login_api."""
redirect_url = "/v1.0/login_api_return"
state = PublicAuthenticationService.generate_state(redirect_url)
login_redirect_url = PublicAuthenticationService().get_login_redirect_url(
state = AuthenticationService.generate_state(redirect_url)
login_redirect_url = AuthenticationService().get_login_redirect_url(
state.decode("UTF-8"), redirect_url
)
return redirect(login_redirect_url)
@ -295,10 +289,10 @@ def login_api_return(code: str, state: str, session_state: str) -> str:
state_dict = ast.literal_eval(base64.b64decode(state).decode("utf-8"))
state_dict["redirect_url"]
id_token_object = PublicAuthenticationService().get_id_token_object(
auth_token_object = AuthenticationService().get_auth_token_object(
code, "/v1.0/login_api_return"
)
access_token: str = id_token_object["access_token"]
access_token: str = auth_token_object["access_token"]
assert access_token # noqa: S101
return access_token
# return redirect("localhost:7000/v1.0/ui")
@ -309,9 +303,7 @@ def logout(id_token: str, redirect_url: Optional[str]) -> Response:
"""Logout."""
if redirect_url is None:
redirect_url = ""
return PublicAuthenticationService().logout(
redirect_url=redirect_url, id_token=id_token
)
return AuthenticationService().logout(redirect_url=redirect_url, id_token=id_token)
def logout_return() -> Response:

View File

@ -10,8 +10,11 @@ import requests
from flask import current_app
from flask import redirect
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from werkzeug.wrappers import Response
from spiffworkflow_backend.models.refresh_token import RefreshTokenModel
class AuthenticationProviderTypes(enum.Enum):
"""AuthenticationServiceProviders."""
@ -20,13 +23,8 @@ class AuthenticationProviderTypes(enum.Enum):
internal = "internal"
class PublicAuthenticationService:
"""PublicAuthenticationService."""
"""Not sure where/if this ultimately lives.
It uses a separate public open_id client: spiffworkflow-frontend
Used during development to make testing easy.
"""
class AuthenticationService:
"""AuthenticationService."""
@staticmethod
def get_open_id_args() -> tuple:
@ -45,8 +43,8 @@ class PublicAuthenticationService:
)
@classmethod
def get_user_info_from_id_token(cls, token: str) -> dict:
"""This seems to work with basic tokens too."""
def get_user_info_from_open_id(cls, token: str) -> dict:
"""The token is an auth_token."""
(
open_id_server_url,
open_id_client_id,
@ -54,10 +52,6 @@ class PublicAuthenticationService:
open_id_client_secret_key,
) = cls.get_open_id_args()
# backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
# backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
# backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
headers = {"Authorization": f"Bearer {token}"}
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/userinfo"
@ -85,7 +79,8 @@ class PublicAuthenticationService:
status_code=401,
)
def get_backend_url(self) -> str:
@staticmethod
def get_backend_url() -> str:
"""Get_backend_url."""
return str(current_app.config["SPIFFWORKFLOW_BACKEND_URL"])
@ -99,7 +94,7 @@ class PublicAuthenticationService:
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = PublicAuthenticationService.get_open_id_args()
) = AuthenticationService.get_open_id_args()
request_url = (
f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/logout?"
+ f"post_logout_redirect_uri={return_redirect_url}&"
@ -123,7 +118,7 @@ class PublicAuthenticationService:
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = PublicAuthenticationService.get_open_id_args()
) = AuthenticationService.get_open_id_args()
return_redirect_url = f"{self.get_backend_url()}{redirect_url}"
login_redirect_url = (
f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/auth?"
@ -135,16 +130,16 @@ class PublicAuthenticationService:
)
return login_redirect_url
def get_id_token_object(
def get_auth_token_object(
self, code: str, redirect_url: str = "/v1.0/login_return"
) -> dict:
"""Get_id_token_object."""
"""Get_auth_token_object."""
(
open_id_server_url,
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = PublicAuthenticationService.get_open_id_args()
) = AuthenticationService.get_open_id_args()
backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
@ -162,8 +157,8 @@ class PublicAuthenticationService:
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
response = requests.post(request_url, data=data, headers=headers)
id_token_object: dict = json.loads(response.text)
return id_token_object
auth_token_object: dict = json.loads(response.text)
return auth_token_object
@classmethod
def validate_id_token(cls, id_token: str) -> bool:
@ -211,3 +206,65 @@ class PublicAuthenticationService:
)
return True
@staticmethod
def store_refresh_token(user_id: int, refresh_token: str) -> None:
"""Store_refresh_token."""
refresh_token_model = RefreshTokenModel.query.filter(
RefreshTokenModel.user_id == user_id
).first()
if refresh_token_model:
refresh_token_model.token = refresh_token
else:
refresh_token_model = RefreshTokenModel(
user_id=user_id, token=refresh_token
)
db.session.add(refresh_token_model)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
raise ApiError(
error_code="store_refresh_token_error",
message=f"We could not store the refresh token. Original error is {e}",
) from e
@staticmethod
def get_refresh_token(user_id: int) -> Optional[str]:
"""Get_refresh_token."""
refresh_token_object: RefreshTokenModel = RefreshTokenModel.query.filter(
RefreshTokenModel.user_id == user_id
).first()
assert refresh_token_object # noqa: S101
return refresh_token_object.token
@classmethod
def get_auth_token_from_refresh_token(cls, refresh_token: str) -> dict:
"""Get a new auth_token from a refresh_token."""
(
open_id_server_url,
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = cls.get_open_id_args()
backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {backend_basic_auth.decode('utf-8')}",
}
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": open_id_client_id,
"client_secret": open_id_client_secret_key,
}
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
response = requests.post(request_url, data=data, headers=headers)
auth_token_object: dict = json.loads(response.text)
return auth_token_object

View File

@ -6,6 +6,8 @@ from typing import Union
import jwt
import yaml
from flask import current_app
from flask import g
from flask import request
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from sqlalchemy import text
@ -21,6 +23,10 @@ from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignme
from spiffworkflow_backend.services.user_service import UserService
class PermissionsFileNotSetError(Exception):
"""PermissionsFileNotSetError."""
class AuthorizationService:
"""Determine whether a user has permission to perform their request."""
@ -47,7 +53,9 @@ class AuthorizationService:
elif permission_assignment.grant_type == "deny":
return False
else:
raise Exception("Unknown grant type")
raise Exception(
f"Unknown grant type: {permission_assignment.grant_type}"
)
return False
@ -72,11 +80,31 @@ class AuthorizationService:
return cls.has_permission(principals, permission, target_uri)
@classmethod
def delete_all_permissions_and_recreate(cls) -> None:
"""Delete_all_permissions_and_recreate."""
for model in [PermissionAssignmentModel, PermissionTargetModel]:
db.session.query(model).delete()
# cascading to principals doesn't seem to work when attempting to delete all so do it like this instead
for group in GroupModel.query.all():
db.session.delete(group)
db.session.commit()
cls.import_permissions_from_yaml_file()
@classmethod
def import_permissions_from_yaml_file(
cls, raise_if_missing_user: bool = False
) -> None:
"""Import_permissions_from_yaml_file."""
if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None:
raise (
PermissionsFileNotSetError(
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME needs to be set in order to import permissions"
)
)
permission_configs = None
with open(current_app.config["PERMISSIONS_FILE_FULLPATH"]) as file:
permission_configs = yaml.safe_load(file)
@ -171,6 +199,88 @@ class AuthorizationService:
db.session.commit()
return permission_assignment
@classmethod
def should_disable_auth_for_request(cls) -> bool:
"""Should_disable_auth_for_request."""
authentication_exclusion_list = ["status", "authentication_callback"]
if request.method == "OPTIONS":
return True
# if the endpoint does not exist then let the system 404
#
# for some reason this runs before connexion checks if the
# endpoint exists.
if not request.endpoint:
return True
api_view_function = current_app.view_functions[request.endpoint]
if (
api_view_function
and api_view_function.__name__.startswith("login")
or api_view_function.__name__.startswith("logout")
or api_view_function.__name__ in authentication_exclusion_list
):
return True
return False
@classmethod
def get_permission_from_http_method(cls, http_method: str) -> Optional[str]:
"""Get_permission_from_request_method."""
request_method_mapper = {
"POST": "create",
"GET": "read",
"PUT": "update",
"DELETE": "delete",
}
if http_method in request_method_mapper:
return request_method_mapper[http_method]
return None
# TODO: we can add the before_request to the blueprint
# directly when we switch over from connexion routes
# to blueprint routes
# @process_api_blueprint.before_request
@classmethod
def check_for_permission(cls) -> None:
"""Check_for_permission."""
if cls.should_disable_auth_for_request():
return None
authorization_exclusion_list = ["permissions_check"]
if not hasattr(g, "user"):
raise ApiError(
error_code="user_not_logged_in",
message="User is not logged in. Please log in",
status_code=401,
)
api_view_function = current_app.view_functions[request.endpoint]
if (
api_view_function
and api_view_function.__name__ in authorization_exclusion_list
):
return None
permission_string = cls.get_permission_from_http_method(request.method)
if permission_string:
has_permission = AuthorizationService.user_has_permission(
user=g.user,
permission=permission_string,
target_uri=request.path,
)
if has_permission:
return None
raise ApiError(
error_code="unauthorized",
message="User is not authorized to perform requested action.",
status_code=403,
)
# def refresh_token(self, token: str) -> str:
# """Refresh_token."""
# # if isinstance(token, str):

View File

@ -889,9 +889,6 @@ class ProcessInstanceProcessor:
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
if save:
self.save()
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we

View File

@ -299,3 +299,17 @@ class UserService:
ugam = UserGroupAssignmentModel(user_id=user.id, group_id=group.id)
db.session.add(ugam)
db.session.commit()
@staticmethod
def get_user_by_service_and_service_id(
service: str, service_id: str
) -> Optional[UserModel]:
"""Get_user_by_service_and_service_id."""
user: UserModel = (
UserModel.query.filter(UserModel.service == service)
.filter(UserModel.service_id == service_id)
.first()
)
if user:
return user
return None

View File

@ -15,6 +15,8 @@ from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from werkzeug.test import TestResponse # type: ignore
from spiffworkflow_backend.models.permission_assignment import Permission
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.process_group import ProcessGroup
from spiffworkflow_backend.models.process_group import ProcessGroupSchema
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@ -92,6 +94,7 @@ class BaseTest:
exception_notification_addresses: Optional[list] = None,
primary_process_id: Optional[str] = None,
primary_file_name: Optional[str] = None,
user: Optional[UserModel] = None,
) -> TestResponse:
"""Create_process_model."""
process_model_service = ProcessModelService()
@ -121,7 +124,9 @@ class BaseTest:
fault_or_suspend_on_exception=fault_or_suspend_on_exception,
exception_notification_addresses=exception_notification_addresses,
)
if user is None:
user = self.find_or_create_user()
response = client.post(
"/v1.0/process-models",
content_type="application/json",
@ -139,6 +144,7 @@ class BaseTest:
process_model: Optional[ProcessModelInfo] = None,
file_name: str = "random_fact.svg",
file_data: bytes = b"abcdef",
user: Optional[UserModel] = None,
) -> Any:
"""Test_create_spec_file."""
if process_model is None:
@ -146,6 +152,7 @@ class BaseTest:
process_model_id, process_group_id=process_group_id
)
data = {"file": (io.BytesIO(file_data), file_name)}
if user is None:
user = self.find_or_create_user()
response = client.post(
f"/v1.0/process-models/{process_model.process_group_id}/{process_model.id}/files",
@ -194,7 +201,7 @@ class BaseTest:
# @staticmethod
# def get_public_access_token(username: str, password: str) -> dict:
# """Get_public_access_token."""
# public_access_token = PublicAuthenticationService().get_public_access_token(
# public_access_token = AuthenticationService().get_public_access_token(
# username, password
# )
# return public_access_token
@ -218,6 +225,46 @@ class BaseTest:
db.session.commit()
return process_instance
@classmethod
def create_user_with_permission(
cls,
username: str,
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: Optional[list[str]] = None,
) -> UserModel:
"""Create_user_with_permission."""
user = BaseTest.find_or_create_user(username=username)
return cls.add_permissions_to_user(
user, target_uri=target_uri, permission_names=permission_names
)
@classmethod
def add_permissions_to_user(
cls,
user: UserModel,
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: Optional[list[str]] = None,
) -> UserModel:
"""Add_permissions_to_user."""
permission_target = PermissionTargetModel.query.filter_by(
uri=target_uri
).first()
if permission_target is None:
permission_target = PermissionTargetModel(uri=target_uri)
db.session.add(permission_target)
db.session.commit()
if permission_names is None:
permission_names = [member.name for member in Permission]
for permission in permission_names:
AuthorizationService.create_permission_for_principal(
principal=user.principal,
permission_target=permission_target,
permission=permission,
)
return user
@staticmethod
def logged_in_headers(
user: UserModel, _redirect_url: str = "http://some/frontend/url"

View File

@ -5,7 +5,7 @@ import base64
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.services.authentication_service import (
PublicAuthenticationService,
AuthenticationService,
)
@ -15,7 +15,7 @@ class TestAuthentication(BaseTest):
def test_get_login_state(self) -> None:
"""Test_get_login_state."""
redirect_url = "http://example.com/"
state = PublicAuthenticationService.generate_state(redirect_url)
state = AuthenticationService.generate_state(redirect_url)
state_dict = ast.literal_eval(base64.b64decode(state).decode("utf-8"))
assert isinstance(state_dict, dict)
@ -24,9 +24,9 @@ class TestAuthentication(BaseTest):
# def test_get_login_redirect_url(self):
# redirect_url = "http://example.com/"
# state = PublicAuthenticationService.generate_state(redirect_url)
# state = AuthenticationService.generate_state(redirect_url)
# with current_app.app_context():
# login_redirect_url = PublicAuthenticationService().get_login_redirect_url(state.decode("UTF-8"))
# login_redirect_url = AuthenticationService().get_login_redirect_url(state.decode("UTF-8"))
# print("test_get_login_redirect_url")
# print("test_get_login_redirect_url")

View File

@ -9,7 +9,7 @@ class TestAuthorization(BaseTest):
# """Test_get_bearer_token."""
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# public_access_token = self.get_public_access_token(user_id, user_id)
# bearer_token = PublicAuthenticationService.get_bearer_token(public_access_token)
# bearer_token = AuthenticationService.get_bearer_token(public_access_token)
# assert isinstance(public_access_token, str)
# assert isinstance(bearer_token, dict)
# assert "access_token" in bearer_token
@ -25,7 +25,7 @@ class TestAuthorization(BaseTest):
# """Test_get_user_info_from_public_access_token."""
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# public_access_token = self.get_public_access_token(user_id, user_id)
# user_info = PublicAuthenticationService.get_user_info_from_id_token(
# user_info = AuthenticationService.get_user_info_from_id_token(
# public_access_token
# )
# assert "sub" in user_info
@ -46,7 +46,7 @@ class TestAuthorization(BaseTest):
# ) = self.get_keycloak_constants(app)
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# basic_token = self.get_public_access_token(user_id, user_id)
# introspection = PublicAuthenticationService.introspect_token(basic_token)
# introspection = AuthenticationService.introspect_token(basic_token)
# assert isinstance(introspection, dict)
# assert introspection["typ"] == "Bearer"
# assert introspection["preferred_username"] == user_id
@ -80,7 +80,7 @@ class TestAuthorization(BaseTest):
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# output[user_id] = {}
# basic_token = self.get_public_access_token(user_id, user_id)
# permissions = PublicAuthenticationService.get_permission_by_basic_token(
# permissions = AuthenticationService.get_permission_by_basic_token(
# basic_token
# )
# if isinstance(permissions, list):
@ -136,7 +136,7 @@ class TestAuthorization(BaseTest):
# for resource in resources:
# output[user_id][resource] = {}
# for scope in "instantiate", "read", "update", "delete":
# auth_status = PublicAuthenticationService.get_auth_status_for_resource_and_scope_by_token(
# auth_status = AuthenticationService.get_auth_status_for_resource_and_scope_by_token(
# basic_token, resource, scope
# )
# output[user_id][resource][scope] = auth_status
@ -152,7 +152,7 @@ class TestAuthorization(BaseTest):
# for resource in resource_names:
# output[user_id][resource] = {}
# for scope in "instantiate", "read", "update", "delete":
# permissions = PublicAuthenticationService.get_permissions_by_token_for_resource_and_scope(
# permissions = AuthenticationService.get_permissions_by_token_for_resource_and_scope(
# basic_token, resource, scope
# )
# output[user_id][resource][scope] = permissions

View File

@ -3,18 +3,23 @@ from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.user import UserModel
class TestLoggingService(BaseTest):
"""Test logging service."""
def test_logging_service_spiff_logger(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_process_instance_run."""
process_group_id = "test_logging_spiff_logger"
process_model_id = "simple_script"
user = self.find_or_create_user()
headers = self.logged_in_headers(user)
headers = self.logged_in_headers(with_super_admin_user)
response = self.create_process_instance(
client, process_group_id, process_model_id, headers
)
@ -22,13 +27,13 @@ class TestLoggingService(BaseTest):
process_instance_id = response.json["id"]
response = client.post(
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run",
headers=self.logged_in_headers(user),
headers=headers,
)
assert response.status_code == 200
log_response = client.get(
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/logs",
headers=self.logged_in_headers(user),
headers=headers,
)
assert log_response.status_code == 200
assert log_response.json

View File

@ -48,6 +48,7 @@ class SecretServiceTestHelpers(BaseTest):
process_model_id=self.test_process_model_id,
process_model_display_name=self.test_process_model_display_name,
process_model_description=self.test_process_model_description,
user=user,
)
process_model_info = ProcessModelService().get_process_model(
self.test_process_model_id, self.test_process_group_id
@ -58,118 +59,153 @@ class SecretServiceTestHelpers(BaseTest):
class TestSecretService(SecretServiceTestHelpers):
"""TestSecretService."""
def test_add_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
def test_add_secret(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_add_secret."""
user = self.find_or_create_user()
test_secret = self.add_test_secret(user)
test_secret = self.add_test_secret(with_super_admin_user)
assert test_secret is not None
assert test_secret.key == self.test_key
assert test_secret.value == self.test_value
assert test_secret.creator_user_id == user.id
assert test_secret.creator_user_id == with_super_admin_user.id
def test_add_secret_duplicate_key_fails(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_add_secret_duplicate_key_fails."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
with pytest.raises(ApiError) as ae:
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
assert ae.value.error_code == "create_secret_error"
def test_get_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
def test_get_secret(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_get_secret."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
secret = SecretService().get_secret(self.test_key)
assert secret is not None
assert secret.value == self.test_value
def test_get_secret_bad_key_fails(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_get_secret_bad_service."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
with pytest.raises(ApiError):
SecretService().get_secret("bad_key")
def test_update_secret(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test update secret."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
secret = SecretService.get_secret(self.test_key)
assert secret
assert secret.value == self.test_value
SecretService.update_secret(self.test_key, "new_secret_value", user.id)
SecretService.update_secret(
self.test_key, "new_secret_value", with_super_admin_user.id
)
new_secret = SecretService.get_secret(self.test_key)
assert new_secret
assert new_secret.value == "new_secret_value" # noqa: S105
def test_update_secret_bad_user_fails(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_update_secret_bad_user."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
with pytest.raises(ApiError) as ae:
SecretService.update_secret(
self.test_key, "new_secret_value", user.id + 1
self.test_key, "new_secret_value", with_super_admin_user.id + 1
) # noqa: S105
assert (
ae.value.message
== f"User: {user.id+1} cannot update the secret with key : test_key"
== f"User: {with_super_admin_user.id+1} cannot update the secret with key : test_key"
)
def test_update_secret_bad_secret_fails(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_update_secret_bad_secret_fails."""
user = self.find_or_create_user()
secret = self.add_test_secret(user)
secret = self.add_test_secret(with_super_admin_user)
with pytest.raises(ApiError) as ae:
SecretService.update_secret(secret.key + "x", "some_new_value", user.id)
SecretService.update_secret(
secret.key + "x", "some_new_value", with_super_admin_user.id
)
assert "Resource does not exist" in ae.value.message
assert ae.value.error_code == "update_secret_error"
def test_delete_secret(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test delete secret."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
secrets = SecretModel.query.all()
assert len(secrets) == 1
assert secrets[0].creator_user_id == user.id
SecretService.delete_secret(self.test_key, user.id)
assert secrets[0].creator_user_id == with_super_admin_user.id
SecretService.delete_secret(self.test_key, with_super_admin_user.id)
secrets = SecretModel.query.all()
assert len(secrets) == 0
def test_delete_secret_bad_user_fails(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_delete_secret_bad_user."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
with pytest.raises(ApiError) as ae:
SecretService.delete_secret(self.test_key, user.id + 1)
SecretService.delete_secret(self.test_key, with_super_admin_user.id + 1)
assert (
f"User: {user.id+1} cannot delete the secret with key" in ae.value.message
f"User: {with_super_admin_user.id+1} cannot delete the secret with key"
in ae.value.message
)
def test_delete_secret_bad_secret_fails(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_delete_secret_bad_secret_fails."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
with pytest.raises(ApiError) as ae:
SecretService.delete_secret(self.test_key + "x", user.id)
SecretService.delete_secret(self.test_key + "x", with_super_admin_user.id)
assert "Resource does not exist" in ae.value.message
@ -177,19 +213,22 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
"""TestSecretServiceApi."""
def test_add_secret(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_add_secret."""
user = self.find_or_create_user()
secret_model = SecretModel(
key=self.test_key,
value=self.test_value,
creator_user_id=user.id,
creator_user_id=with_super_admin_user.id,
)
data = json.dumps(SecretModelSchema().dump(secret_model))
response: TestResponse = client.post(
"/v1.0/secrets",
headers=self.logged_in_headers(user),
headers=self.logged_in_headers(with_super_admin_user),
content_type="application/json",
data=data,
)
@ -199,17 +238,20 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
assert key in secret.keys()
assert secret["key"] == self.test_key
assert secret["value"] == self.test_value
assert secret["creator_user_id"] == user.id
assert secret["creator_user_id"] == with_super_admin_user.id
def test_get_secret(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test get secret."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
secret_response = client.get(
f"/v1.0/secrets/{self.test_key}",
headers=self.logged_in_headers(user),
headers=self.logged_in_headers(with_super_admin_user),
)
assert secret_response
assert secret_response.status_code == 200
@ -217,20 +259,25 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
assert secret_response.json["value"] == self.test_value
def test_update_secret(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_update_secret."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
secret: Optional[SecretModel] = SecretService.get_secret(self.test_key)
assert secret
assert secret.value == self.test_value
secret_model = SecretModel(
key=self.test_key, value="new_secret_value", creator_user_id=user.id
key=self.test_key,
value="new_secret_value",
creator_user_id=with_super_admin_user.id,
)
response = client.put(
f"/v1.0/secrets/{self.test_key}",
headers=self.logged_in_headers(user),
headers=self.logged_in_headers(with_super_admin_user),
content_type="application/json",
data=json.dumps(SecretModelSchema().dump(secret_model)),
)
@ -242,42 +289,61 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
assert secret_model.value == "new_secret_value"
def test_delete_secret(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test delete secret."""
user = self.find_or_create_user()
self.add_test_secret(user)
self.add_test_secret(with_super_admin_user)
secret = SecretService.get_secret(self.test_key)
assert secret
assert secret.value == self.test_value
secret_response = client.delete(
f"/v1.0/secrets/{self.test_key}",
headers=self.logged_in_headers(user),
headers=self.logged_in_headers(with_super_admin_user),
)
assert secret_response.status_code == 200
with pytest.raises(ApiError):
secret = SecretService.get_secret(self.test_key)
def test_delete_secret_bad_user(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_delete_secret_bad_user."""
user_1 = self.find_or_create_user()
user_2 = self.find_or_create_user("test_user_2")
self.add_test_secret(user_1)
# ensure user has permissions to delete the given secret
self.add_permissions_to_user(
user_2,
target_uri=f"/v1.0/secrets/{self.test_key}",
permission_names=["delete"],
)
secret_response = client.delete(
f"/v1.0/secrets/{self.test_key}",
headers=self.logged_in_headers(user_2),
)
assert secret_response.status_code == 401
assert secret_response.json
assert secret_response.json["error_code"] == "delete_secret_error"
def test_delete_secret_bad_key(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test delete secret."""
user = self.find_or_create_user()
secret_response = client.delete(
"/v1.0/secrets/bad_secret_key",
headers=self.logged_in_headers(user),
headers=self.logged_in_headers(with_super_admin_user),
)
assert secret_response.status_code == 404

View File

@ -13,10 +13,10 @@ from spiffworkflow_backend.models.permission_target import PermissionTargetModel
class TestPermissionTarget(BaseTest):
"""TestPermissionTarget."""
def test_asterisk_must_go_at_the_end_of_uri(
def test_wildcard_must_go_at_the_end_of_uri(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_asterisk_must_go_at_the_end_of_uri."""
"""Test_wildcard_must_go_at_the_end_of_uri."""
permission_target = PermissionTargetModel(uri="/test_group/%")
db.session.add(permission_target)
db.session.commit()
@ -30,3 +30,13 @@ class TestPermissionTarget(BaseTest):
assert (
str(exception.value) == "Wildcard must appear at end: /test_group/%/model"
)
def test_can_change_asterisk_to_percent_on_creation(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_can_change_asterisk_to_percent_on_creation."""
permission_target = PermissionTargetModel(uri="/test_group/*")
db.session.add(permission_target)
db.session.commit()
assert isinstance(permission_target.id, int)
assert permission_target.uri == "/test_group/%"