added missing permissions to elevated perm macro and removed api calls from unit tests

This commit is contained in:
jasquat 2023-05-22 13:50:32 -04:00
parent c6b842f2b8
commit b575ef09de
No known key found for this signature in database
23 changed files with 265 additions and 558 deletions

View File

@ -62,65 +62,5 @@ def with_db_and_bpmn_file_cleanup() -> None:
@pytest.fixture()
def with_super_admin_user() -> UserModel:
"""With_super_admin_user."""
return BaseTest.create_user_with_permission("super_admin")
@pytest.fixture()
def setup_process_instances_for_reports(
client: FlaskClient, with_super_admin_user: UserModel
) -> list[ProcessInstanceModel]:
"""Setup_process_instances_for_reports."""
user = with_super_admin_user
process_group_id = "runs_without_input"
process_model_id = "sample"
# bpmn_file_name = "sample.bpmn"
bpmn_file_location = "sample"
process_model_identifier = BaseTest().create_group_and_model_with_bpmn(
client,
with_super_admin_user,
process_group_id=process_group_id,
process_model_id=process_model_id,
# bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location,
)
# BaseTest().create_process_group(
# client=client, user=user, process_group_id=process_group_id, display_name=process_group_id
# )
# process_model_id = "runs_without_input/sample"
# load_test_spec(
# process_model_id=f"{process_group_id}/{process_model_id}",
# process_model_source_directory="sample"
# )
process_instances = []
for data in [kay(), ray(), jay()]:
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
# process_group_identifier=process_group_id,
process_model_identifier=process_model_identifier,
user=user,
)
processor = ProcessInstanceProcessor(process_instance)
processor.slam_in_data(data)
process_instance.status = "complete"
db.session.add(process_instance)
db.session.commit()
process_instances.append(process_instance)
return process_instances
def kay() -> dict:
"""Kay."""
return {"name": "kay", "grade_level": 2, "test_score": 10}
def ray() -> dict:
"""Ray."""
return {"name": "ray", "grade_level": 1, "test_score": 9}
def jay() -> dict:
"""Jay."""
return {"name": "jay", "grade_level": 2, "test_score": 8}
raise Exception("HEY")
# return BaseTest.create_user_with_permission("super_admin")

View File

@ -149,7 +149,7 @@ paths:
$ref: "#/components/schemas/OkTrue"
/debug/test-raise-error:
get:
post:
operationId: spiffworkflow_backend.routes.debug_controller.test_raise_error
summary: Returns an unhandled exception that should notify sentry, if sentry is configured
tags:
@ -184,7 +184,7 @@ paths:
description: The identifier for the last visited page for the user.
schema:
type: string
get:
post:
tags:
- Active User
operationId: spiffworkflow_backend.routes.active_users_controller.active_user_updates
@ -207,7 +207,7 @@ paths:
description: The identifier for the last visited page for the user.
schema:
type: string
get:
post:
tags:
- Active User
operationId: spiffworkflow_backend.routes.active_users_controller.active_user_unregister

View File

@ -16,10 +16,18 @@ groups:
users: [testuser2, testuser3, testuser4]
permissions:
admin:
process-groups-all:
groups: [admin]
allowed_permissions: [create, read, update, delete]
uri: /*
allowed_permissions: [all]
uri: PG:ALL
basic:
groups: [admin]
allowed_permissions: [all]
uri: BASIC
elevated-operations:
groups: [admin]
allowed_permissions: [all]
uri: ELEVATED
read-all:
groups: ["Finance Team", hr, admin]

View File

@ -257,7 +257,7 @@ def manual_complete_task(
process_instance = ProcessInstanceModel.query.filter(ProcessInstanceModel.id == int(process_instance_id)).first()
if process_instance:
processor = ProcessInstanceProcessor(process_instance)
processor.manual_complete_task(task_guid, execute)
processor.manual_complete_task(task_guid, execute, g.user)
else:
raise ApiError(
error_code="complete_task",

View File

@ -79,6 +79,7 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [
{"path": "/process-data-file-download", "relevant_permissions": ["read"]},
{"path": "/process-instance-suspend", "relevant_permissions": ["create"]},
{"path": "/process-instance-terminate", "relevant_permissions": ["create"]},
{"path": "/process-model-natural-language", "relevant_permissions": ["create"]},
{"path": "/process-model-publish", "relevant_permissions": ["create"]},
{"path": "/task-data", "relevant_permissions": ["read", "update"]},
]
@ -487,11 +488,10 @@ class AuthorizationService:
@classmethod
def set_basic_permissions(cls) -> list[PermissionToAssign]:
permissions_to_assign: list[PermissionToAssign] = []
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/active-users/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/process-instances/for-me"))
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata")
)
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/active-users/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/users/exists/by-username"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/connector-proxy/typeahead/*"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/debug/version-info"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-groups"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-models"))
@ -499,7 +499,11 @@ class AuthorizationService:
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes/callers"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/service-tasks"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/user-groups/for-current-user"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/users/exists/by-username"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/users/search"))
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata")
)
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/find-by-id/*")
)
@ -522,13 +526,15 @@ class AuthorizationService:
# FIXME: we need to fix so that user that can start a process-model
# can also start through messages as well
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/messages/*"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/messages"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/authentications"))
permissions_to_assign.append(
PermissionToAssign(permission="create", target_uri="/can-run-privileged-script/*")
)
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/debug/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/send-event/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/task-complete/*"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/users/search"))
# read comes from PG and PM permissions
permissions_to_assign.append(PermissionToAssign(permission="update", target_uri="/task-data/*"))
@ -697,6 +703,8 @@ class AuthorizationService:
group_identifier = group["name"]
GroupService.find_or_create_group(group_identifier)
for username in group["users"]:
if user_model and username != user_model.username:
continue
user_to_group_dict: UserToGroupDict = {
"username": username,
"group_identifier": group_identifier,
@ -704,6 +712,10 @@ class AuthorizationService:
user_to_group_identifiers.append(user_to_group_dict)
GroupService.add_user_to_group_or_add_to_waiting(username, group_identifier)
unique_user_group_identifiers.add(group_identifier)
for group in group_permissions:
group_identifier = group["name"]
if user_model and group_identifier not in unique_user_group_identifiers:
continue
for permission in group["permissions"]:
for crud_op in permission["actions"]:
permission_assignments.extend(

View File

@ -1109,7 +1109,7 @@ class ProcessInstanceProcessor:
# TODO: do_engine_steps without a lock
self.do_engine_steps(save=True)
def manual_complete_task(self, task_id: str, execute: bool) -> None:
def manual_complete_task(self, task_id: str, execute: bool, user: UserModel) -> None:
"""Mark the task complete optionally executing it."""
spiff_task = self.bpmn_process_instance.get_task_from_id(UUID(task_id))
event_type = ProcessInstanceEventType.task_skipped.value
@ -1122,7 +1122,7 @@ class ProcessInstanceProcessor:
f" instance {self.process_instance_model.id}"
)
human_task = HumanTaskModel.query.filter_by(task_id=task_id).first()
self.complete_task(spiff_task, human_task=human_task, user=g.user)
self.complete_task(spiff_task, human_task=human_task, user=user)
elif execute:
current_app.logger.info(
f"Manually executing Task {spiff_task.task_spec.name} of process"

View File

@ -9,9 +9,12 @@ from typing import Optional
from flask import current_app
from flask.testing import FlaskClient
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from werkzeug.test import TestResponse # type: ignore
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.permission_assignment import Permission
@ -314,9 +317,11 @@ class BaseTest:
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: Optional[list[str]] = None,
) -> UserModel:
"""Create_user_with_permission."""
user = BaseTest.find_or_create_user(username=username)
return cls.add_permissions_to_user(user, target_uri=target_uri, permission_names=permission_names)
# user = BaseTest.find_or_create_user(username=username)
# return cls.add_permissions_to_user(user, target_uri=target_uri, permission_names=permission_names)
user = BaseTest.find_or_create_user(username='testadmin1')
AuthorizationService.import_permissions_from_yaml_file(user)
return user
@classmethod
def add_permissions_to_user(
@ -325,7 +330,6 @@ class BaseTest:
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: Optional[list[str]] = None,
) -> UserModel:
"""Add_permissions_to_user."""
permission_target = AuthorizationService.find_or_create_permission_target(target_uri)
if permission_names is None:
@ -401,3 +405,67 @@ class BaseTest:
def empty_report_metadata_body(self) -> ReportMetadata:
return {"filter_by": [], "columns": [], "order_by": []}
def start_sender_process(
self,
client: FlaskClient,
payload: dict,
group_name: str = "test_group",
) -> ProcessInstanceModel:
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
process_instance = self.create_process_instance_from_process_model(
process_model
)
processor_send_receive = ProcessInstanceProcessor(process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
payload,
process_instance.process_initiator,
human_task,
)
processor_send_receive.save()
return process_instance
def assure_a_message_was_sent(self, process_instance: ProcessInstanceModel, payload: dict) -> None:
# There should be one new send message for the given process instance.
send_messages = (
MessageInstanceModel.query.filter_by(message_type="send")
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(send_messages) == 1
send_message = send_messages[0]
assert send_message.payload == payload, "The send message should match up with the payload"
assert send_message.name == "Request Approval"
assert send_message.status == "ready"
def assure_there_is_a_process_waiting_on_a_message(self, process_instance: ProcessInstanceModel) -> None:
# There should be one new send message for the given process instance.
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
def assure_correlation_properties_are_right(self, message: MessageInstanceModel) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None

View File

@ -10,7 +10,7 @@ class TestDebugController(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
response = client.get(
response = client.post(
"/v1.0/debug/test-raise-error",
)
assert response.status_code == 500

View File

@ -0,0 +1,63 @@
import pytest
from flask import Flask
from flask import g
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.routes.messages_controller import message_send
class TestMessages(BaseTest):
def test_message_from_api_into_running_process(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test sending a message to a running process via the API.
This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called 'approval_result' that has the matching correlation properties,
as sent by an API Call.
"""
payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00",
}
process_instance = self.start_sender_process(client, payload, "test_from_api")
self.assure_a_message_was_sent(process_instance, payload)
self.assure_there_is_a_process_waiting_on_a_message(process_instance)
g.user = process_instance.process_initiator
# Make an API call to the service endpoint, but use the wrong po number
with pytest.raises(ApiError):
message_send("Approval Result", {"payload": {"po_number": 5001}})
# Should return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "jon"}},
)
# No error when calling with the correct parameters
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "Sartography"}},
)
# There is no longer a waiting message
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=process_instance.id)
.all()
)
assert len(waiting_messages) == 0
# The process has completed
assert process_instance.status == "complete"

View File

@ -1,5 +1,6 @@
"""Test_message_service."""
import pytest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -60,7 +61,6 @@ class TestAuthorizationService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_user_can_be_added_to_human_task_on_first_login."""
initiator_user = self.find_or_create_user("initiator_user")
@ -69,16 +69,12 @@ class TestAuthorizationService(BaseTest):
self.find_or_create_user("testuser1")
AuthorizationService.import_permissions_from_yaml_file()
process_model_identifier = self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id="test_group",
process_model_id="model_with_lanes",
process_model = load_test_spec(
process_model_id="test_group/model_with_lanes",
bpmn_file_name="lanes.bpmn",
bpmn_file_location="model_with_lanes",
process_model_source_directory="model_with_lanes",
)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user
)
@ -141,6 +137,7 @@ class TestAuthorizationService(BaseTest):
"delete",
),
("/process-instances/some-process-group:some-process-model:*", "read"),
('/process-model-natural-language/some-process-group:some-process-model:*', "create"),
("/process-model-publish/some-process-group:some-process-model:*", "create"),
("/process-models/some-process-group:some-process-model:*", "create"),
("/process-models/some-process-group:some-process-model:*", "delete"),
@ -163,7 +160,7 @@ class TestAuthorizationService(BaseTest):
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_start_on_process_group."""
expected_permissions = [
expected_permissions = sorted([
("/event-error-details/some-process-group:some-process-model:*", "read"),
(
"/logs/some-process-group:some-process-model:*",
@ -182,7 +179,7 @@ class TestAuthorizationService(BaseTest):
"read",
),
("/process-instances/some-process-group:some-process-model:*", "create"),
]
])
permissions_to_assign = AuthorizationService.explode_permissions(
"start", "PG:/some-process-group/some-process-model"
)
@ -222,6 +219,7 @@ class TestAuthorizationService(BaseTest):
"delete",
),
("/process-instances/some-process-group:some-process-model/*", "read"),
('/process-model-natural-language/some-process-group:some-process-model/*', "create"),
("/process-model-publish/some-process-group:some-process-model/*", "create"),
("/process-models/some-process-group:some-process-model/*", "create"),
("/process-models/some-process-group:some-process-model/*", "delete"),
@ -244,7 +242,7 @@ class TestAuthorizationService(BaseTest):
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_start_on_process_model."""
expected_permissions = [
expected_permissions = sorted([
(
"/event-error-details/some-process-group:some-process-model/*",
"read",
@ -263,7 +261,7 @@ class TestAuthorizationService(BaseTest):
"read",
),
("/process-instances/some-process-group:some-process-model/*", "create"),
]
])
permissions_to_assign = AuthorizationService.explode_permissions(
"start", "PM:/some-process-group/some-process-model"
)
@ -276,8 +274,9 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
("/active-users/*", "read"),
expected_permissions = sorted([
("/active-users/*", "create"),
("/connector-proxy/typeahead/*", "read"),
("/debug/version-info", "read"),
("/process-groups", "read"),
("/process-instances/find-by-id/*", "read"),
@ -297,7 +296,8 @@ class TestAuthorizationService(BaseTest):
("/tasks/*", "update"),
("/user-groups/for-current-user", "read"),
("/users/exists/by-username", "create"),
]
("/users/search", "read"),
])
permissions_to_assign = AuthorizationService.explode_permissions("all", "BASIC")
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
@ -308,8 +308,11 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
expected_permissions = sorted([
("/authentications", "read"),
("/can-run-privileged-script/*", "create"),
("/debug/*", "create"),
("/messages", "read"),
("/messages/*", "create"),
("/process-instance-reset/*", "create"),
("/process-instance-resume/*", "create"),
@ -326,8 +329,7 @@ class TestAuthorizationService(BaseTest):
("/send-event/*", "create"),
("/task-complete/*", "create"),
("/task-data/*", "update"),
("/users/search", "read"),
]
])
permissions_to_assign = AuthorizationService.explode_permissions("all", "ELEVATED")
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
@ -437,7 +439,7 @@ class TestAuthorizationService(BaseTest):
AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_two").first() is None
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")
self.assert_user_has_permission(admin_user, "create", "/v1.0/process-groups/whatever")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
@ -469,7 +471,7 @@ class TestAuthorizationService(BaseTest):
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False)
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")
self.assert_user_has_permission(admin_user, "create", "/v1.0/process-groups/whatever")
self.assert_user_has_permission(user_two, "read", "/v1.0/process-groups/hey", expected_result=False)
assert GroupModel.query.filter_by(identifier="group_three").first() is not None

View File

@ -1,5 +1,6 @@
"""Test_various_bpmn_constructs."""
from flask.app import Flask
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -18,29 +19,17 @@ class TestDotNotation(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_form_data_conversion_to_dot_dict."""
process_group_id = "dot_notation_group"
process_model_id = "test_dot_notation"
process_model_id = "dot_notation_group/test_dot_notation"
bpmn_file_name = "diagram.bpmn"
bpmn_file_location = "dot_notation"
process_model_identifier = self.create_group_and_model_with_bpmn(
client,
with_super_admin_user,
process_group_id=process_group_id,
process_model = load_test_spec(
process_model_id=process_model_id,
bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location,
process_model_source_directory=bpmn_file_location,
)
headers = self.logged_in_headers(with_super_admin_user)
response = self.create_process_instance_from_process_model_id_with_api(
client, process_model_identifier, headers
)
process_instance_id = response.json["id"]
process_instance = ProcessInstanceService().get_process_instance(process_instance_id)
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
human_task = process_instance.human_tasks[0]
@ -53,7 +42,7 @@ class TestDotNotation(BaseTest):
"invoice.invoiceAmount": "1000.00",
"invoice.dueDate": "09/30/2022",
}
ProcessInstanceService.complete_form_task(processor, user_task, form_data, with_super_admin_user, human_task)
ProcessInstanceService.complete_form_task(processor, user_task, form_data, process_instance.process_initiator, human_task)
expected = {
"contibutorName": "Elizabeth",

View File

@ -28,10 +28,10 @@ class TestErrorHandlingService(BaseTest):
"""
def run_process_model_and_handle_error(
self, process_model: ProcessModelInfo, user: UserModel
self, process_model: ProcessModelInfo
) -> ProcessInstanceModel:
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id, user
process_instance = self.create_process_instance_from_process_model(
process_model
)
pip = ProcessInstanceProcessor(process_instance)
with pytest.raises(WorkflowExecutionServiceError) as e:
@ -44,7 +44,6 @@ class TestErrorHandlingService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Process Model in DB marked as suspended when error occurs."""
process_model = load_test_spec(
@ -54,13 +53,13 @@ class TestErrorHandlingService(BaseTest):
)
# Process instance should be marked as errored by default.
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
process_instance = self.run_process_model_and_handle_error(process_model)
assert ProcessInstanceStatus.error.value == process_instance.status
# If process model should be suspended on error, then that is what should happen.
process_model.fault_or_suspend_on_exception = "suspend"
ProcessModelService.save_process_model(process_model)
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
process_instance = self.run_process_model_and_handle_error(process_model)
assert ProcessInstanceStatus.suspended.value == process_instance.status
def test_error_sends_bpmn_message(
@ -68,7 +67,6 @@ class TestErrorHandlingService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Real BPMN Messages should get generated and processes should fire off and complete."""
process_model = load_test_spec(
@ -85,7 +83,7 @@ class TestErrorHandlingService(BaseTest):
process_model.exception_notification_addresses = ["dan@ILoveToReadErrorsInMyEmails.com"]
ProcessModelService.save_process_model(process_model)
# kick off the process and assure it got marked as an error.
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
process_instance = self.run_process_model_and_handle_error(process_model)
assert ProcessInstanceStatus.error.value == process_instance.status
# Both send and receive messages should be generated, matched

View File

@ -1,7 +1,9 @@
"""Test_message_instance."""
import pytest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from flask import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.db import db
@ -11,36 +13,25 @@ from spiffworkflow_backend.services.process_model_service import ProcessModelSer
class TestMessageInstance(BaseTest):
"""TestMessageInstance."""
def setup_message_tests(self, client: FlaskClient, user: UserModel) -> str:
"""Setup_message_tests."""
process_group_id = "test_group"
process_model_id = "hello_world"
def setup_message_tests(self, client: FlaskClient) -> ProcessModelInfo:
process_model_id = "testk_group/hello_world"
bpmn_file_name = "hello_world.bpmn"
bpmn_file_location = "hello_world"
process_model_identifier = self.create_group_and_model_with_bpmn(
client,
user,
process_group_id=process_group_id,
process_model = load_test_spec(
process_model_id=process_model_id,
bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location,
process_model_source_directory=bpmn_file_location,
)
return process_model_identifier
return process_model
def test_can_create_message_instance(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_create_message_instance."""
message_name = "Message Model One"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
queued_message = MessageInstanceModel(
@ -64,12 +55,9 @@ class TestMessageInstance(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_cannot_set_invalid_status."""
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
with pytest.raises(ValueError) as exception:
@ -100,13 +88,9 @@ class TestMessageInstance(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_cannot_set_invalid_message_type."""
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
with pytest.raises(ValueError) as exception:
@ -136,13 +120,9 @@ class TestMessageInstance(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_force_failure_cause_if_status_is_failure."""
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
queued_message = MessageInstanceModel(

View File

@ -1,15 +1,10 @@
"""Test_message_service."""
import pytest
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.messages_controller import message_send
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -20,66 +15,11 @@ from spiffworkflow_backend.services.process_instance_service import (
class TestMessageService(BaseTest):
"""TestMessageService."""
def test_message_from_api_into_running_process(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test sending a message to a running process via the API.
This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called 'approval_result' that has the matching correlation properties,
as sent by an API Call.
"""
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00",
}
self.start_sender_process(client, with_super_admin_user, "test_from_api")
self.assure_a_message_was_sent()
self.assure_there_is_a_process_waiting_on_a_message()
# Make an API call to the service endpoint, but use the wrong po number
with pytest.raises(ApiError):
message_send("Approval Result", {"payload": {"po_number": 5001}})
# Should return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "jon"}},
)
# No error when calling with the correct parameters
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "Sartography"}},
)
# There is no longer a waiting message
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.all()
)
assert len(waiting_messages) == 0
# The process has completed
assert self.process_instance.status == "complete"
def test_single_conversation_between_two_processes(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test messages between two different running processes using a single conversation.
@ -87,7 +27,7 @@ class TestMessageService(BaseTest):
we have two process instances that are communicating with each other using one conversation about an
Invoice whose details are defined in the following message payload
"""
self.payload = {
payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
@ -104,8 +44,8 @@ class TestMessageService(BaseTest):
)
# Now start the main process
self.start_sender_process(client, with_super_admin_user, "test_between_processes")
self.assure_a_message_was_sent()
process_instance = self.start_sender_process(client, payload, "test_between_processes")
self.assure_a_message_was_sent(process_instance, payload)
# This is typically called in a background cron process, so we will manually call it
# here in the tests
@ -113,7 +53,7 @@ class TestMessageService(BaseTest):
MessageService.correlate_all_message_instances()
# The sender process should still be waiting on a message to be returned to it ...
self.assure_there_is_a_process_waiting_on_a_message()
self.assure_there_is_a_process_waiting_on_a_message(process_instance)
# The second time we call ths process_message_isntances (again it would typically be running on cron)
# it will deliver the message that was sent from the receiver back to the original sender.
@ -125,7 +65,7 @@ class TestMessageService(BaseTest):
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
@ -136,7 +76,7 @@ class TestMessageService(BaseTest):
assert len(waiting_messages) == 0
# The message sender process is complete
assert self.process_instance.status == "complete"
assert process_instance.status == "complete"
# The message receiver process is also complete
message_receiver_process = (
@ -146,83 +86,15 @@ class TestMessageService(BaseTest):
)
assert message_receiver_process.status == "complete"
def start_sender_process(
self,
client: FlaskClient,
with_super_admin_user: UserModel,
group_name: str = "test_group",
) -> None:
process_group_id = group_name
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id,
with_super_admin_user,
)
processor_send_receive = ProcessInstanceProcessor(self.process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = self.process_instance.active_human_tasks[0]
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
self.payload,
with_super_admin_user,
human_task,
)
processor_send_receive.save()
def assure_a_message_was_sent(self) -> None:
# There should be one new send message for the given process instance.
send_messages = (
MessageInstanceModel.query.filter_by(message_type="send")
.filter_by(process_instance_id=self.process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(send_messages) == 1
send_message = send_messages[0]
assert send_message.payload == self.payload, "The send message should match up with the payload"
assert send_message.name == "Request Approval"
assert send_message.status == "ready"
def assure_there_is_a_process_waiting_on_a_message(self) -> None:
# There should be one new send message for the given process instance.
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
def assure_correlation_properties_are_right(self, message: MessageInstanceModel) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None
def test_can_send_message_to_multiple_process_models(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_send_message_to_multiple_process_models."""
process_group_id = "test_group_multi"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
# self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_sender = load_test_spec(
"test_group/message_sender",

View File

@ -1,4 +1,3 @@
"""Test Permissions."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -22,18 +21,13 @@ from spiffworkflow_backend.services.user_service import UserService
# * super-admins users maybe conventionally get the user role as well
# finance-admin role allows create, update, and delete of all models under the finance group
class TestPermissions(BaseTest):
"""TestPermissions."""
def test_user_can_be_given_permission_to_administer_process_group(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_user_can_be_given_permission_to_administer_process_group."""
process_group_id = "group-a"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
load_test_spec(
"group-a/timers_intermediate_catch_event",
bpmn_file_name="timers_intermediate_catch_event.bpmn",
@ -58,7 +52,6 @@ class TestPermissions(BaseTest):
def test_group_a_admin_needs_to_stay_away_from_group_b(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_group_a_admin_needs_to_stay_away_from_group_b."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
process_group_b_id = process_group_ids[1]
@ -87,7 +80,6 @@ class TestPermissions(BaseTest):
self.assert_user_has_permission(group_a_admin, "update", f"/{process_group_b_id}", expected_result=False)
def test_user_can_be_granted_access_through_a_group(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
"""Test_user_can_be_granted_access_through_a_group."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
for process_group_id in process_group_ids:
@ -125,7 +117,6 @@ class TestPermissions(BaseTest):
def test_user_can_be_read_models_with_global_permission(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_user_can_be_read_models_with_global_permission."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
process_group_b_id = process_group_ids[1]
@ -156,7 +147,6 @@ class TestPermissions(BaseTest):
def test_user_can_access_base_path_when_given_wildcard_permission(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_user_can_access_base_path_when_given_wildcard_permission."""
group_a_admin = self.find_or_create_user()
permission_target = PermissionTargetModel(uri="/process-models/%")

View File

@ -70,10 +70,9 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_sets_permission_correctly_on_human_task."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user = self.find_or_create_user("testuser2")
assert initiator_user.principal is not None
@ -138,10 +137,9 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_sets_permission_correctly_on_human_task_when_using_dict."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
finance_user_four = self.find_or_create_user("testuser4")
@ -234,7 +232,6 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_does_not_recreate_human_tasks_on_multiple_saves."""
initiator_user = self.find_or_create_user("initiator_user")
@ -264,9 +261,8 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -319,9 +315,8 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -439,15 +434,14 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/boundary_event_reset",
process_model_source_directory="boundary_event_reset",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
process_model=process_model
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
@ -455,7 +449,7 @@ class TestProcessInstanceProcessor(BaseTest):
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(
processor, spiff_manual_task, {}, with_super_admin_user, human_task_one
processor, spiff_manual_task, {}, process_instance.process_initiator, human_task_one
)
assert (
len(process_instance.active_human_tasks) == 1
@ -473,7 +467,7 @@ class TestProcessInstanceProcessor(BaseTest):
human_task_one = process_instance.active_human_tasks[0]
assert human_task_one.task_title == "Manual Task #1"
processor = ProcessInstanceProcessor(process_instance)
processor.manual_complete_task(str(human_task_one.task_id), execute=True)
processor.manual_complete_task(str(human_task_one.task_id), execute=True, user=process_instance.process_initiator)
processor = ProcessInstanceProcessor(process_instance)
processor.resume()
processor.do_engine_steps(save=True)
@ -490,22 +484,21 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/step_through_gateway",
process_model_source_directory="step_through_gateway",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
process_model=process_model
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
human_task_one = process_instance.active_human_tasks[0]
processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
processor.manual_complete_task(str(human_task_one.task_id), execute=True)
processor.manual_complete_task(str(human_task_one.task_id), execute=True, user=process_instance.process_initiator)
processor.save()
processor = ProcessInstanceProcessor(process_instance)
step1_task = processor.get_task_by_bpmn_identifier("step_1", processor.bpmn_process_instance)
@ -516,7 +509,7 @@ class TestProcessInstanceProcessor(BaseTest):
assert gateway_task.state == TaskState.READY
gateway_task = processor.bpmn_process_instance.get_tasks(TaskState.READY)[0]
processor.manual_complete_task(str(gateway_task.id), execute=True)
processor.manual_complete_task(str(gateway_task.id), execute=True, user=process_instance.process_initiator)
processor.save()
processor = ProcessInstanceProcessor(process_instance)
gateway_task = processor.get_task_by_bpmn_identifier("Gateway_Open", processor.bpmn_process_instance)
@ -528,9 +521,8 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -757,10 +749,9 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_does_not_recreate_human_tasks_on_multiple_saves."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -868,7 +859,6 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_task_data_is_set_even_if_process_instance_errors."""
process_model = load_test_spec(
@ -877,7 +867,7 @@ class TestProcessInstanceProcessor(BaseTest):
process_model_source_directory="error",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
process_model=process_model
)
processor = ProcessInstanceProcessor(process_instance)

View File

@ -1,141 +0,0 @@
# from typing import Optional
#
# from flask.app import Flask
# from tests.spiffworkflow_backend.helpers.base_test import BaseTest
#
# from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
# from spiffworkflow_backend.models.process_instance_report import (
# ProcessInstanceReportModel,
# )
#
# # from tests.spiffworkflow_backend.helpers.test_data import find_or_create_process_group
# # from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
# # from spiffworkflow_backend.models.permission_target import PermissionTargetModel
#
#
# def test_generate_report_with_filter_by(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_user_can_be_given_permission_to_administer_process_group."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {
# "filter_by": [
# {"field_name": "grade_level", "operator": "equals", "field_value": 2}
# ]
# }
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 2
# names = get_names_from_results(results)
# assert names == ["kay", "jay"]
#
#
# def test_generate_report_with_filter_by_with_variable_substitution(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_filter_by_with_variable_substitution."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {
# "filter_by": [
# {
# "field_name": "grade_level",
# "operator": "equals",
# "field_value": "{{grade_level}}",
# }
# ]
# }
# results = do_report_with_metadata_and_instances(
# report_metadata, process_instances, {"grade_level": 1}
# )
# assert len(results) == 1
# names = get_names_from_results(results)
# assert names == ["ray"]
#
#
# def test_generate_report_with_order_by_and_one_field(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_order_by_and_one_field."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {"order_by": ["test_score"]}
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 3
# names = get_names_from_results(results)
# assert names == ["jay", "ray", "kay"]
#
#
# def test_generate_report_with_order_by_and_two_fields(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_order_by_and_two_fields."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {"order_by": ["grade_level", "test_score"]}
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 3
# names = get_names_from_results(results)
# assert names == ["ray", "jay", "kay"]
#
#
# def test_generate_report_with_order_by_desc(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_order_by_desc."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {"order_by": ["grade_level", "-test_score"]}
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 3
# names = get_names_from_results(results)
# assert names == ["ray", "kay", "jay"]
#
#
# def test_generate_report_with_columns(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_columns."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {
# "columns": [
# {"Header": "Name", "accessor": "name"},
# {"Header": "Status", "accessor": "status"},
# ],
# "order_by": ["test_score"],
# "filter_by": [
# {"field_name": "grade_level", "operator": "equals", "field_value": 1}
# ],
# }
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 1
# assert results == [{"name": "ray", "status": "complete"}]
#
#
# def do_report_with_metadata_and_instances(
# report_metadata: dict,
# process_instances: list[ProcessInstanceModel],
# substitution_variables: Optional[dict] = None,
# ) -> list[dict]:
# """Do_report_with_metadata_and_instances."""
# process_instance_report = ProcessInstanceReportModel.create_report(
# identifier="sure",
# report_metadata=report_metadata,
# user=BaseTest.find_or_create_user(),
# )
#
# return process_instance_report.generate_report(
# process_instances, substitution_variables
# )["results"]
#
#
# def get_names_from_results(results: list[dict]) -> list[str]:
# """Get_names_from_results."""
# return [result["name"] for result in results]

View File

@ -40,7 +40,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
model = ProcessInstanceService.file_data_model_for_value(
"uploaded_file",
@ -53,7 +52,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
model = ProcessInstanceService.file_data_model_for_value(
"not_a_file",
@ -66,7 +64,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"uploaded_file": self.SAMPLE_FILE_DATA,
@ -80,7 +77,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"uploaded_files": [self.SAMPLE_FILE_DATA, self.SAMPLE_FILE_DATA],
@ -95,7 +91,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -120,7 +115,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -135,7 +129,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"uploaded_file": self.SAMPLE_FILE_DATA,
@ -156,7 +149,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -171,7 +163,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -198,7 +189,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"File": [

View File

@ -18,9 +18,7 @@ class TestRestrictedScriptEngine(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
"test_group/dangerous",
bpmn_file_name="read_etc_passwd.bpmn",
@ -40,9 +38,7 @@ class TestRestrictedScriptEngine(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
"test_group/dangerous",
bpmn_file_name="read_env.bpmn",

View File

@ -1,4 +1,3 @@
"""Test Permissions."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -13,20 +12,15 @@ from spiffworkflow_backend.services.script_unit_test_runner import ScriptUnitTes
class TestScriptUnitTestRunner(BaseTest):
"""TestScriptUnitTestRunner."""
def test_takes_data_and_returns_expected_result(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_takes_data_and_returns_expected_result."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "simple_script"
process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
@ -56,14 +50,10 @@ class TestScriptUnitTestRunner(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_fails_when_expected_output_does_not_match_actual_output."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "simple_script"
process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
@ -93,14 +83,10 @@ class TestScriptUnitTestRunner(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_script_with_unit_tests_when_hey_is_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "script_with_unit_tests"
process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
@ -126,14 +112,10 @@ class TestScriptUnitTestRunner(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_script_with_unit_tests_when_hey_is_not_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "script_with_unit_tests"
process_model_identifier = f"{process_group_id}/{process_model_id}"

View File

@ -22,8 +22,10 @@ from spiffworkflow_backend.services.spec_file_service import SpecFileService
class TestSpecFileService(BaseTest):
"""TestSpecFileService."""
process_group_id = "test_process_group_id"
process_model_id = "call_activity_nested"
process_group_id = ""
process_model_id = "test_process_group_id/call_activity_nested"
# process_group_id = "test_process_group_id"
# process_model_id = "call_activity_nested"
bpmn_file_name = "call_activity_nested.bpmn"
call_activity_nested_relative_file_path = os.path.join(process_group_id, process_model_id, bpmn_file_name)
@ -33,16 +35,11 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_store_process_ids_for_lookup."""
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location="call_activity_nested",
process_model_source_directory='call_activity_nested',
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
assert len(bpmn_process_id_lookups) == 1
@ -54,17 +51,13 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_fails_to_save_duplicate_process_id."""
bpmn_process_identifier = "Level1"
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id,
process_model_source_directory='call_activity_nested',
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
assert len(bpmn_process_id_lookups) == 1
@ -87,7 +80,6 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_updates_relative_file_path_when_appropriate."""
bpmn_process_identifier = "Level1"
@ -99,13 +91,10 @@ class TestSpecFileService(BaseTest):
db.session.add(process_id_lookup)
db.session.commit()
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id,
process_model_source_directory='call_activity_nested',
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
@ -139,7 +128,6 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""When a BPMN processes identifier is changed in a file, the old id is removed from the cache."""
old_identifier = "ye_old_identifier"
@ -147,19 +135,16 @@ class TestSpecFileService(BaseTest):
identifier=old_identifier,
relative_path=self.call_activity_nested_relative_file_path,
file_name=self.bpmn_file_name,
process_model_id=f"{self.process_group_id}/{self.process_model_id}",
process_model_id=self.process_model_id,
type="process",
)
db.session.add(process_id_lookup)
db.session.commit()
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id,
process_model_source_directory='call_activity_nested',
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
@ -173,7 +158,6 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_load_reference_information.
@ -186,32 +170,22 @@ class TestSpecFileService(BaseTest):
a DMN file can (theoretically) contain many decisions. So this
is an array.
"""
process_group_id = "test_group"
process_model_id = "call_activity_nested"
process_model_identifier = self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=process_group_id,
process_model_id = "test_group/call_activity_nested"
process_model = load_test_spec(
process_model_id=process_model_id,
# bpmn_file_name=bpmn_file_name,
bpmn_file_location=process_model_id,
process_model_source_directory='call_activity_nested',
)
# load_test_spec(
# ,
# process_model_source_directory="call_activity_nested",
# )
process_model_info = ProcessModelService.get_process_model(process_model_identifier)
files = SpecFileService.get_files(process_model_info)
files = SpecFileService.get_files(process_model)
file = next(filter(lambda f: f.name == "call_activity_level_3.bpmn", files))
ca_3 = SpecFileService.get_references_for_file(file, process_model_info)
ca_3 = SpecFileService.get_references_for_file(file, process_model)
assert len(ca_3) == 1
assert ca_3[0].display_name == "Level 3"
assert ca_3[0].identifier == "Level3"
assert ca_3[0].type == "process"
file = next(filter(lambda f: f.name == "level2c.dmn", files))
dmn1 = SpecFileService.get_references_for_file(file, process_model_info)
dmn1 = SpecFileService.get_references_for_file(file, process_model)
assert len(dmn1) == 1
assert dmn1[0].display_name == "Decision 1"
assert dmn1[0].identifier == "Decision_0vrtcmk"

View File

@ -1,5 +1,6 @@
"""Test_various_bpmn_constructs."""
from flask.app import Flask
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -11,25 +12,16 @@ from spiffworkflow_backend.services.process_model_service import ProcessModelSer
class TestVariousBpmnConstructs(BaseTest):
"""TestVariousBpmnConstructs."""
def test_running_process_with_timer_intermediate_catch_event(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_running_process_with_timer_intermediate_catch_event."""
process_model_identifier = self.create_group_and_model_with_bpmn(
client,
with_super_admin_user,
"test_group",
"timer_intermediate_catch_event",
process_model = load_test_spec(
process_model_id="test_group/timer_intermediate_catch_event",
process_model_source_directory='timer_intermediate_catch_event',
)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)

View File

@ -18,6 +18,7 @@ export default function ActiveUsers() {
HttpService.makeCallToBackend({
path: `/active-users/updates/${lastVisitedIdentifier}`,
successCallback: setActiveUsers,
httpMethod: 'POST',
});
};
@ -25,6 +26,7 @@ export default function ActiveUsers() {
HttpService.makeCallToBackend({
path: `/active-users/unregister/${lastVisitedIdentifier}`,
successCallback: setActiveUsers,
httpMethod: 'POST',
});
};
updateActiveUsers();