pyl w/ burnettk

This commit is contained in:
jasquat 2022-12-20 15:47:30 -05:00
parent 5a6e181a99
commit 92258c6f9a
17 changed files with 189 additions and 156 deletions

View File

@ -1,5 +1,3 @@
from __future__ import with_statement
import logging import logging
from logging.config import fileConfig from logging.config import fileConfig

View File

@ -27,7 +27,9 @@ class GroupModel(FlaskBpmnGroupModel):
identifier = db.Column(db.String(255)) identifier = db.Column(db.String(255))
user_group_assignments = relationship("UserGroupAssignmentModel", cascade="delete") user_group_assignments = relationship("UserGroupAssignmentModel", cascade="delete")
user_group_assignments_waiting = relationship("UserGroupAssignmentWaitingModel", cascade="delete") user_group_assignments_waiting = relationship(
"UserGroupAssignmentWaitingModel", cascade="delete"
)
users = relationship( # type: ignore users = relationship( # type: ignore
"UserModel", "UserModel",
viewonly=True, viewonly=True,

View File

@ -1,22 +1,15 @@
"""User.""" """User."""
from __future__ import annotations from __future__ import annotations
from typing import Any
import jwt import jwt
import marshmallow import marshmallow
from flask import current_app from flask import current_app
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from marshmallow import Schema from marshmallow import Schema
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.services.authentication_service import (
AuthenticationProviderTypes,
)
class UserNotFoundError(Exception): class UserNotFoundError(Exception):
@ -29,8 +22,12 @@ class UserModel(SpiffworkflowBaseDBModel):
__tablename__ = "user" __tablename__ = "user"
__table_args__ = (db.UniqueConstraint("service", "service_id", name="service_key"),) __table_args__ = (db.UniqueConstraint("service", "service_id", name="service_key"),)
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(255), nullable=False, unique=True) # should always be a unique value username = db.Column(
service = db.Column(db.String(50), nullable=False, unique=False) # not 'openid' -- google, aws db.String(255), nullable=False, unique=True
) # should always be a unique value
service = db.Column(
db.String(50), nullable=False, unique=False
) # not 'openid' -- google, aws
service_id = db.Column(db.String(255), nullable=False, unique=False) service_id = db.Column(db.String(255), nullable=False, unique=False)
display_name = db.Column(db.String(255)) display_name = db.Column(db.String(255))
email = db.Column(db.String(255)) email = db.Column(db.String(255))

View File

@ -5,16 +5,19 @@ from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
class UserGroupAssignmentWaitingModel(SpiffworkflowBaseDBModel): class UserGroupAssignmentWaitingModel(SpiffworkflowBaseDBModel):
"""UserGroupAssignmentsWaitingModel - When a user is assinged to a group, but that username does not exist, """UserGroupAssignmentsWaitingModel - When a user is assinged to a group, but that username does not exist,
we cache it here to be applied in the event the user does log into the system. we cache it here to be applied in the event the user does log into the system.
""" """
MATCH_ALL_USERS = "*" MATCH_ALL_USERS = "*"
__tablename__ = "user_group_assignment_waiting" __tablename__ = "user_group_assignment_waiting"
__table_args__ = ( __table_args__ = (
db.UniqueConstraint("username", "group_id", name="user_group_assignment_staged_unique"), db.UniqueConstraint(
"username", "group_id", name="user_group_assignment_staged_unique"
),
) )
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
@ -24,6 +27,7 @@ class UserGroupAssignmentWaitingModel(SpiffworkflowBaseDBModel):
group = relationship("GroupModel", overlaps="groups,user_group_assignment_waiting,users") # type: ignore group = relationship("GroupModel", overlaps="groups,user_group_assignment_waiting,users") # type: ignore
def is_match_all(self): def is_match_all(self):
"""Is_match_all."""
if self.username == self.MATCH_ALL_USERS: if self.username == self.MATCH_ALL_USERS:
return True return True
return False return False

View File

@ -111,7 +111,7 @@ def token() -> dict:
"iat": time.time(), "iat": time.time(),
"exp": time.time() + 86400, # Expire after a day. "exp": time.time() + 86400, # Expire after a day.
"sub": user_name, "sub": user_name,
"email": user_details['email'], "email": user_details["email"],
"preferred_username": user_details.get("preferred_username", user_name), "preferred_username": user_details.get("preferred_username", user_name),
}, },
client_secret, client_secret,

View File

@ -1,19 +1,16 @@
"""Get_env.""" """Get_env."""
from typing import Any from typing import Any
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.group import GroupNotFoundError
from spiffworkflow_backend.models.script_attributes_context import ( from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext, ScriptAttributesContext,
) )
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.group_service import GroupService from spiffworkflow_backend.services.group_service import GroupService
from spiffworkflow_backend.services.user_service import UserService
# add_permission("read", "test/*", "Editors") # add_permission("read", "test/*", "Editors")
class AddPermission(Script): class AddPermission(Script):
"""AddUserToGroup.""" """AddUserToGroup."""
@ -35,4 +32,4 @@ class AddPermission(Script):
target = AuthorizationService.find_or_create_permission_target(uri) target = AuthorizationService.find_or_create_permission_target(uri)
AuthorizationService.create_permission_for_principal( AuthorizationService.create_permission_for_principal(
group.principal, target, allowed_permission group.principal, target, allowed_permission
) )

View File

@ -1,13 +1,10 @@
"""Get_env.""" """Get_env."""
from typing import Any from typing import Any
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.group import GroupNotFoundError
from spiffworkflow_backend.models.script_attributes_context import ( from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext, ScriptAttributesContext,
) )
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.group_service import GroupService from spiffworkflow_backend.services.group_service import GroupService
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.user_service import UserService
@ -35,4 +32,4 @@ class AddUserToGroup(Script):
if user: if user:
UserService.add_user_to_group(user, group) UserService.add_user_to_group(user, group)
else: else:
UserService.add_waiting_group_assignment(username, group) UserService.add_waiting_group_assignment(username, group)

View File

@ -1,16 +1,11 @@
"""Get_env.""" """Get_env."""
from typing import Any from typing import Any
from flask_bpmn.models.db import db
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.group import GroupNotFoundError
from spiffworkflow_backend.models.script_attributes_context import ( from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext, ScriptAttributesContext,
) )
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.user_service import UserService
class ClearPermissions(Script): class ClearPermissions(Script):
@ -27,4 +22,4 @@ class ClearPermissions(Script):
**kwargs: Any, **kwargs: Any,
) -> Any: ) -> Any:
"""Run.""" """Run."""
AuthorizationService.delete_all_permissions() AuthorizationService.delete_all_permissions()

View File

@ -219,6 +219,7 @@ class AuthorizationService:
@classmethod @classmethod
def find_or_create_permission_target(cls, uri): def find_or_create_permission_target(cls, uri):
"""Find_or_create_permission_target."""
uri_with_percent = re.sub(r"\*", "%", uri) uri_with_percent = re.sub(r"\*", "%", uri)
permission_target = PermissionTargetModel.query.filter_by( permission_target = PermissionTargetModel.query.filter_by(
uri=uri_with_percent uri=uri_with_percent
@ -452,9 +453,9 @@ class AuthorizationService:
@classmethod @classmethod
def create_user_from_sign_in(cls, user_info: dict) -> UserModel: def create_user_from_sign_in(cls, user_info: dict) -> UserModel:
"""Create_user_from_sign_in.""" """Create_user_from_sign_in."""
"""name, family_name, given_name, middle_name, nickname, preferred_username,""" """Name, family_name, given_name, middle_name, nickname, preferred_username,"""
"""profile, picture, website, gender, birthdate, zoneinfo, locale, and updated_at. """ """Profile, picture, website, gender, birthdate, zoneinfo, locale, and updated_at. """
"""email""" """Email."""
is_new_user = False is_new_user = False
user_model = ( user_model = (
UserModel.query.filter(UserModel.service == user_info["iss"]) UserModel.query.filter(UserModel.service == user_info["iss"])
@ -483,10 +484,10 @@ class AuthorizationService:
service=user_info["iss"], service=user_info["iss"],
service_id=user_info["sub"], service_id=user_info["sub"],
email=email, email=email,
display_name = display_name display_name=display_name,
) )
else : else:
# Update with the latest information # Update with the latest information
user_model.username = username user_model.username = username
user_model.email = email user_model.email = email

View File

@ -13,7 +13,9 @@ from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.principal import PrincipalModel from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
from spiffworkflow_backend.models.user_group_assignment_waiting import UserGroupAssignmentWaitingModel from spiffworkflow_backend.models.user_group_assignment_waiting import (
UserGroupAssignmentWaitingModel,
)
class UserService: class UserService:
@ -26,7 +28,7 @@ class UserService:
service: str, service: str,
service_id: str, service_id: str,
email: Optional[str] = "", email: Optional[str] = "",
display_name: Optional[str] = "" display_name: Optional[str] = "",
) -> UserModel: ) -> UserModel:
"""Create_user.""" """Create_user."""
user_model: Optional[UserModel] = ( user_model: Optional[UserModel] = (
@ -43,7 +45,7 @@ class UserService:
service=service, service=service,
service_id=service_id, service_id=service_id,
email=email, email=email,
display_name=display_name display_name=display_name,
) )
db.session.add(user_model) db.session.add(user_model)
@ -128,7 +130,12 @@ class UserService:
@classmethod @classmethod
def add_user_to_group(cls, user: UserModel, group: GroupModel) -> None: def add_user_to_group(cls, user: UserModel, group: GroupModel) -> None:
"""Add_user_to_group.""" """Add_user_to_group."""
exists = UserGroupAssignmentModel().query.filter_by(user_id=user.id).filter_by(group_id=group.id).count() exists = (
UserGroupAssignmentModel()
.query.filter_by(user_id=user.id)
.filter_by(group_id=group.id)
.count()
)
if not exists: if not exists:
ugam = UserGroupAssignmentModel(user_id=user.id, group_id=group.id) ugam = UserGroupAssignmentModel(user_id=user.id, group_id=group.id)
db.session.add(ugam) db.session.add(ugam)
@ -136,9 +143,17 @@ class UserService:
@classmethod @classmethod
def add_waiting_group_assignment(cls, username: str, group: GroupModel) -> None: def add_waiting_group_assignment(cls, username: str, group: GroupModel) -> None:
wugam = UserGroupAssignmentWaitingModel().query.filter_by(username=username).filter_by(group_id=group.id).first() """Add_waiting_group_assignment."""
wugam = (
UserGroupAssignmentWaitingModel()
.query.filter_by(username=username)
.filter_by(group_id=group.id)
.first()
)
if not wugam: if not wugam:
wugam = UserGroupAssignmentWaitingModel(username=username, group_id=group.id) wugam = UserGroupAssignmentWaitingModel(
username=username, group_id=group.id
)
db.session.add(wugam) db.session.add(wugam)
db.session.commit() db.session.commit()
if wugam.is_match_all(): if wugam.is_match_all():
@ -147,13 +162,23 @@ class UserService:
@classmethod @classmethod
def apply_waiting_group_assignments(cls, user: UserModel) -> None: def apply_waiting_group_assignments(cls, user: UserModel) -> None:
waiting = UserGroupAssignmentWaitingModel().query.\ """Apply_waiting_group_assignments."""
filter(UserGroupAssignmentWaitingModel.username == user.username).all() waiting = (
UserGroupAssignmentWaitingModel()
.query.filter(UserGroupAssignmentWaitingModel.username == user.username)
.all()
)
for assignment in waiting: for assignment in waiting:
cls.add_user_to_group(user, assignment.group) cls.add_user_to_group(user, assignment.group)
db.session.delete(assignment) db.session.delete(assignment)
wildcard = UserGroupAssignmentWaitingModel().query.\ wildcard = (
filter(UserGroupAssignmentWaitingModel.username == UserGroupAssignmentWaitingModel.MATCH_ALL_USERS).all() UserGroupAssignmentWaitingModel()
.query.filter(
UserGroupAssignmentWaitingModel.username
== UserGroupAssignmentWaitingModel.MATCH_ALL_USERS
)
.all()
)
for assignment in wildcard: for assignment in wildcard:
cls.add_user_to_group(user, assignment.group) cls.add_user_to_group(user, assignment.group)
db.session.commit() db.session.commit()

View File

@ -1,6 +1,5 @@
"""Test_authentication.""" """Test_authentication."""
import base64 import base64
import time
import jwt import jwt
from flask import Flask from flask import Flask
@ -48,8 +47,8 @@ class TestFlaskOpenId(BaseTest):
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
"""Test_get_token."""
code = ("testadmin1:1234123412341234") code = "testadmin1:1234123412341234"
"""It should be possible to get a token.""" """It should be possible to get a token."""
backend_basic_auth_string = code backend_basic_auth_string = code
@ -67,11 +66,12 @@ class TestFlaskOpenId(BaseTest):
response = client.post("/openid/token", data=data, headers=headers) response = client.post("/openid/token", data=data, headers=headers)
assert response assert response
assert response.is_json assert response.is_json
assert 'access_token' in response.json assert "access_token" in response.json
assert 'id_token' in response.json assert "id_token" in response.json
assert 'refresh_token' in response.json assert "refresh_token" in response.json
decoded_token = jwt.decode(response.json['id_token'], options={"verify_signature": False})
assert 'iss' in decoded_token
assert 'email' in decoded_token
decoded_token = jwt.decode(
response.json["id_token"], options={"verify_signature": False}
)
assert "iss" in decoded_token
assert "email" in decoded_token

View File

@ -1,30 +1,22 @@
"""Test_get_localtime.""" """Test_get_localtime."""
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from flask_bpmn.models.db import db
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.script_attributes_context import ScriptAttributesContext
from spiffworkflow_backend.scripts.add_permission import AddPermission
from spiffworkflow_backend.scripts.clear_permissions import ClearPermissions
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.group_service import GroupService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.models.permission_target import PermissionTargetModel
ProcessInstanceProcessor, from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
) )
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.add_permission import AddPermission
class TestAddPermission(BaseTest): class TestAddPermission(BaseTest):
"""TestAddPermission.""" """TestAddPermission."""
def test_can_add_permission ( def test_can_add_permission(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
@ -32,7 +24,7 @@ class TestAddPermission(BaseTest):
with_super_admin_user: UserModel, with_super_admin_user: UserModel,
) -> None: ) -> None:
"""Test_can_get_members_of_a_group.""" """Test_can_get_members_of_a_group."""
test_user = self.find_or_create_user("test_user") self.find_or_create_user("test_user")
# now that we have everything, try to clear it out... # now that we have everything, try to clear it out...
script_attributes_context = ScriptAttributesContext( script_attributes_context = ScriptAttributesContext(
@ -42,20 +34,27 @@ class TestAddPermission(BaseTest):
process_model_identifier="my_test_user", process_model_identifier="my_test_user",
) )
group = GroupModel.query.filter(GroupModel.identifier == 'my_test_group').first() group = GroupModel.query.filter(
permission_target = PermissionTargetModel.query.filter(PermissionTargetModel.uri == '/test_add_permission/%').first() GroupModel.identifier == "my_test_group"
assert(group is None) ).first()
assert(permission_target is None) permission_target = PermissionTargetModel.query.filter(
PermissionTargetModel.uri == "/test_add_permission/%"
).first()
assert group is None
assert permission_target is None
result = AddPermission().run( result = AddPermission().run(
script_attributes_context, script_attributes_context, "read", "/test_add_permission/*", "my_test_group"
"read",
"/test_add_permission/*",
"my_test_group"
) )
group = GroupModel.query.filter(GroupModel.identifier == 'my_test_group').first() group = GroupModel.query.filter(
permission_target = PermissionTargetModel.query.filter(PermissionTargetModel.uri == '/test_add_permission/%').first() GroupModel.identifier == "my_test_group"
permission_assignments = PermissionAssignmentModel.query.filter(PermissionAssignmentModel.principal_id == group.principal.id).all() ).first()
assert(group is not None) permission_target = PermissionTargetModel.query.filter(
assert(permission_target is not None) PermissionTargetModel.uri == "/test_add_permission/%"
assert(len(permission_assignments) == 1) ).first()
permission_assignments = PermissionAssignmentModel.query.filter(
PermissionAssignmentModel.principal_id == group.principal.id
).all()
assert group is not None
assert permission_target is not None
assert len(permission_assignments) == 1

View File

@ -2,19 +2,17 @@
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from spiffworkflow_backend.models.script_attributes_context import ScriptAttributesContext
from spiffworkflow_backend.models.user_group_assignment_waiting import UserGroupAssignmentWaitingModel
from spiffworkflow_backend.scripts.add_user_to_group import AddUserToGroup
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.script_attributes_context import (
from spiffworkflow_backend.services.process_instance_processor import ( ScriptAttributesContext,
ProcessInstanceProcessor,
) )
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user_group_assignment_waiting import (
UserGroupAssignmentWaitingModel,
)
from spiffworkflow_backend.scripts.add_user_to_group import AddUserToGroup
class TestAddUserToGroup(BaseTest): class TestAddUserToGroup(BaseTest):
@ -38,11 +36,9 @@ class TestAddUserToGroup(BaseTest):
process_model_identifier="my_test_user", process_model_identifier="my_test_user",
) )
result = AddUserToGroup().run( result = AddUserToGroup().run(
script_attributes_context, script_attributes_context, my_user.username, my_group.identifier
my_user.username,
my_group.identifier
) )
assert(my_user in my_group.users) assert my_user in my_group.users
def test_can_add_non_existent_user_to_non_existent_group( def test_can_add_non_existent_user_to_non_existent_group(
self, self,
@ -51,6 +47,7 @@ class TestAddUserToGroup(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel, with_super_admin_user: UserModel,
) -> None: ) -> None:
"""Test_can_add_non_existent_user_to_non_existent_group."""
script_attributes_context = ScriptAttributesContext( script_attributes_context = ScriptAttributesContext(
task=None, task=None,
environment_identifier="testing", environment_identifier="testing",
@ -58,11 +55,15 @@ class TestAddUserToGroup(BaseTest):
process_model_identifier="my_test_user", process_model_identifier="my_test_user",
) )
result = AddUserToGroup().run( result = AddUserToGroup().run(
script_attributes_context, script_attributes_context, "dan@sartography.com", "competent-joes"
"dan@sartography.com",
"competent-joes"
) )
my_group = GroupModel.query.filter(GroupModel.identifier == "competent-joes").first() my_group = GroupModel.query.filter(
assert (my_group is not None) GroupModel.identifier == "competent-joes"
waiting_assignments = UserGroupAssignmentWaitingModel().query.filter_by(username="dan@sartography.com").first() ).first()
assert (waiting_assignments is not None) assert my_group is not None
waiting_assignments = (
UserGroupAssignmentWaitingModel()
.query.filter_by(username="dan@sartography.com")
.first()
)
assert waiting_assignments is not None

View File

@ -2,27 +2,24 @@
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.script_attributes_context import ScriptAttributesContext from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.clear_permissions import ClearPermissions from spiffworkflow_backend.scripts.clear_permissions import ClearPermissions
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.group_service import GroupService from spiffworkflow_backend.services.group_service import GroupService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.user_service import UserService
class TestDeletePermissions(BaseTest): class TestDeletePermissions(BaseTest):
"""TestGetGroupMembers.""" """TestGetGroupMembers."""
def test_can_delete_members ( def test_can_delete_members(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
@ -30,12 +27,12 @@ class TestDeletePermissions(BaseTest):
with_super_admin_user: UserModel, with_super_admin_user: UserModel,
) -> None: ) -> None:
"""Test_can_get_members_of_a_group.""" """Test_can_get_members_of_a_group."""
initiator_user = self.find_or_create_user("initiator_user") self.find_or_create_user("initiator_user")
testuser1 = self.find_or_create_user("testuser1") testuser1 = self.find_or_create_user("testuser1")
testuser2 = self.find_or_create_user("testuser2") testuser2 = self.find_or_create_user("testuser2")
testuser3 = self.find_or_create_user("testuser3") testuser3 = self.find_or_create_user("testuser3")
group_a = GroupService.find_or_create_group('groupA') group_a = GroupService.find_or_create_group("groupA")
group_b = GroupService.find_or_create_group('groupB') group_b = GroupService.find_or_create_group("groupB")
db.session.add(group_a) db.session.add(group_a)
db.session.add(group_b) db.session.add(group_b)
db.session.commit() db.session.commit()
@ -43,12 +40,12 @@ class TestDeletePermissions(BaseTest):
UserService.add_user_to_group(testuser2, group_a) UserService.add_user_to_group(testuser2, group_a)
UserService.add_user_to_group(testuser3, group_b) UserService.add_user_to_group(testuser3, group_b)
target = PermissionTargetModel('test/*') target = PermissionTargetModel("test/*")
db.session.add(target) db.session.add(target)
db.session.commit() db.session.commit()
AuthorizationService.create_permission_for_principal(group_a.principal, AuthorizationService.create_permission_for_principal(
target, group_a.principal, target, "read"
"read") )
# now that we have everything, try to clear it out... # now that we have everything, try to clear it out...
script_attributes_context = ScriptAttributesContext( script_attributes_context = ScriptAttributesContext(
task=None, task=None,
@ -56,9 +53,7 @@ class TestDeletePermissions(BaseTest):
process_instance_id=1, process_instance_id=1,
process_model_identifier="my_test_user", process_model_identifier="my_test_user",
) )
result = ClearPermissions().run( result = ClearPermissions().run(script_attributes_context)
script_attributes_context
)
groups = GroupModel.query.all() groups = GroupModel.query.all()
assert(0 == len(groups)) assert 0 == len(groups)

View File

@ -134,7 +134,12 @@ class TestAuthorizationService(BaseTest):
human_task.task_name, processor.bpmn_process_instance human_task.task_name, processor.bpmn_process_instance
) )
finance_user = AuthorizationService.create_user_from_sign_in( finance_user = AuthorizationService.create_user_from_sign_in(
{"username": "testuser2", "sub": "testuser2", "iss": "https://test.stuff", "email": "testuser2"} {
"username": "testuser2",
"sub": "testuser2",
"iss": "https://test.stuff",
"email": "testuser2",
}
) )
ProcessInstanceService.complete_form_task( ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, finance_user, human_task processor, spiff_task, {}, finance_user, human_task

View File

@ -1,50 +1,54 @@
"""Process Model.""" """Process Model."""
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from spiffworkflow_backend.models.user_group_assignment_waiting import UserGroupAssignmentWaitingModel
from spiffworkflow_backend.services.group_service import GroupService
from spiffworkflow_backend.services.user_service import UserService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.services.git_service import GitService from spiffworkflow_backend.models.user_group_assignment_waiting import (
UserGroupAssignmentWaitingModel,
)
from spiffworkflow_backend.services.group_service import GroupService
from spiffworkflow_backend.services.user_service import UserService
class TestUserService(BaseTest): class TestUserService(BaseTest):
"""TestUserService.""" """TestUserService."""
def test_assigning_a_group_to_a_user_before_the_user_is_created ( def test_assigning_a_group_to_a_user_before_the_user_is_created(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
"""Test_waiting_group_assignments""" """Test_waiting_group_assignments."""
aTestGroup = GroupService.find_or_create_group("aTestGroup") aTestGroup = GroupService.find_or_create_group("aTestGroup")
UserService.add_waiting_group_assignment("initiator_user", aTestGroup) UserService.add_waiting_group_assignment("initiator_user", aTestGroup)
initiator_user = self.find_or_create_user("initiator_user") initiator_user = self.find_or_create_user("initiator_user")
assert initiator_user.groups[0] == aTestGroup assert initiator_user.groups[0] == aTestGroup
def test_assigning_a_group_to_all_users_updates_new_users ( def test_assigning_a_group_to_all_users_updates_new_users(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
"""Test_waiting_group_assignments""" """Test_waiting_group_assignments."""
everybody_group = GroupService.find_or_create_group("everybodyGroup") everybody_group = GroupService.find_or_create_group("everybodyGroup")
UserService.add_waiting_group_assignment(UserGroupAssignmentWaitingModel.MATCH_ALL_USERS, everybody_group) UserService.add_waiting_group_assignment(
UserGroupAssignmentWaitingModel.MATCH_ALL_USERS, everybody_group
)
initiator_user = self.find_or_create_user("initiator_user") initiator_user = self.find_or_create_user("initiator_user")
assert initiator_user.groups[0] == everybody_group assert initiator_user.groups[0] == everybody_group
def test_assigning_a_group_to_all_users_updates_existing_users ( def test_assigning_a_group_to_all_users_updates_existing_users(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
"""Test_waiting_group_assignments""" """Test_waiting_group_assignments."""
initiator_user = self.find_or_create_user("initiator_user") initiator_user = self.find_or_create_user("initiator_user")
everybody_group = GroupService.find_or_create_group("everybodyGroup") everybody_group = GroupService.find_or_create_group("everybodyGroup")
UserService.add_waiting_group_assignment(UserGroupAssignmentWaitingModel.MATCH_ALL_USERS, everybody_group) UserService.add_waiting_group_assignment(
UserGroupAssignmentWaitingModel.MATCH_ALL_USERS, everybody_group
)
assert initiator_user.groups[0] == everybody_group assert initiator_user.groups[0] == everybody_group

View File

@ -6,7 +6,16 @@ import {
useSearchParams, useSearchParams,
} from 'react-router-dom'; } from 'react-router-dom';
// @ts-ignore // @ts-ignore
import { Button, Modal, Content, Tabs, TabList, Tab, TabPanels, TabPanel } from '@carbon/react'; import {
Button,
Modal,
Content,
Tabs,
TabList,
Tab,
TabPanels,
TabPanel,
} from '@carbon/react';
import Row from 'react-bootstrap/Row'; import Row from 'react-bootstrap/Row';
import Col from 'react-bootstrap/Col'; import Col from 'react-bootstrap/Col';
@ -403,10 +412,10 @@ export default function ProcessModelEditDiagram() {
const jsonEditorOptions = () => { const jsonEditorOptions = () => {
return Object.assign(generalEditorOptions(), { return Object.assign(generalEditorOptions(), {
minimap: { enabled: false }, minimap: { enabled: false },
folding: true folding: true,
}); });
} };
const setPreviousScriptUnitTest = () => { const setPreviousScriptUnitTest = () => {
resetUnitTextResult(); resetUnitTextResult();
@ -472,9 +481,14 @@ export default function ProcessModelEditDiagram() {
let expectedJson = ''; let expectedJson = '';
try { try {
inputJson = JSON.parse(currentScriptUnitTest.inputJson.value); inputJson = JSON.parse(currentScriptUnitTest.inputJson.value);
expectedJson = JSON.parse(currentScriptUnitTest.expectedOutputJson.value); expectedJson = JSON.parse(
currentScriptUnitTest.expectedOutputJson.value
);
} catch (e) { } catch (e) {
setScriptUnitTestResult({ result:false, error:"The JSON provided contains a formatting error."}) setScriptUnitTestResult({
result: false,
error: 'The JSON provided contains a formatting error.',
});
return; return;
} }
@ -487,26 +501,25 @@ export default function ProcessModelEditDiagram() {
bpmn_task_identifier: (scriptElement as any).id, bpmn_task_identifier: (scriptElement as any).id,
python_script: scriptText, python_script: scriptText,
input_json: inputJson, input_json: inputJson,
expected_output_json: expectedJson expected_output_json: expectedJson,
}, },
}); });
} }
}; };
const unitTestFailureElement = () => { const unitTestFailureElement = () => {
if ( if (scriptUnitTestResult && scriptUnitTestResult.result === false) {
scriptUnitTestResult &&
scriptUnitTestResult.result === false
) {
let errorMessage = ''; let errorMessage = '';
if (scriptUnitTestResult.context) { if (scriptUnitTestResult.context) {
errorMessage = 'Unexpected result. Please see the comparison below.'; errorMessage = 'Unexpected result. Please see the comparison below.';
} else if (scriptUnitTestResult.line_number) { } else if (scriptUnitTestResult.line_number) {
errorMessage = `Error encountered running the script. Please check the code around line ${ scriptUnitTestResult.line_number}` errorMessage = `Error encountered running the script. Please check the code around line ${scriptUnitTestResult.line_number}`;
} else { } else {
errorMessage = `Error encountered running the script. ${JSON.stringify(scriptUnitTestResult.error)}` errorMessage = `Error encountered running the script. ${JSON.stringify(
scriptUnitTestResult.error
)}`;
} }
let errorStringElement = <span>{ errorMessage }</span>; let errorStringElement = <span>{errorMessage}</span>;
let errorContextElement = null; let errorContextElement = null;