import io import json import os import time from typing import Any from flask import current_app from flask.testing import FlaskClient from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.permission_assignment import Permission from spiffworkflow_backend.models.permission_target import PermissionTargetModel from spiffworkflow_backend.models.process_group import ProcessGroup from spiffworkflow_backend.models.process_group import ProcessGroupSchema from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance_report import ReportMetadata from spiffworkflow_backend.models.process_model import NotificationType from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.user_service import UserService from werkzeug.test import TestResponse # type: ignore from tests.spiffworkflow_backend.helpers.test_data import load_test_spec # from tests.spiffworkflow_backend.helpers.test_data import logged_in_headers class BaseTest: @staticmethod def find_or_create_user(username: str = "test_user_1") -> UserModel: user = UserModel.query.filter_by(username=username).first() if isinstance(user, UserModel): return user user = UserService.create_user(username, "internal", username) if isinstance(user, UserModel): return user raise ApiError( error_code="create_user_error", message=f"Cannot find or create user: {username}", ) @staticmethod def logged_in_headers(user: UserModel, _redirect_url: str = "http://some/frontend/url") -> dict[str, str]: return dict(Authorization="Bearer " + user.encode_auth_token()) def create_group_and_model_with_bpmn( self, client: FlaskClient, user: UserModel, process_group_id: str | None = "test_group", process_model_id: str | None = "random_fact", bpmn_file_name: str | None = None, bpmn_file_location: str | None = None, ) -> str: """Creates a process group. Creates a process model Adds a bpmn file to the model. """ process_group_display_name = process_group_id or "" process_group_description = process_group_id or "" process_model_identifier = f"{process_group_id}/{process_model_id}" if bpmn_file_location is None: bpmn_file_location = process_model_id self.create_process_group_with_api(client, user, process_group_description, process_group_display_name) self.create_process_model_with_api( client, process_model_id=process_model_identifier, process_model_display_name=process_group_display_name, process_model_description=process_group_description, user=user, ) load_test_spec( process_model_id=process_model_identifier, bpmn_file_name=bpmn_file_name, process_model_source_directory=bpmn_file_location, ) return process_model_identifier def create_process_group( self, process_group_id: str, display_name: str = "", ) -> ProcessGroup: process_group = ProcessGroup(id=process_group_id, display_name=display_name, display_order=0, admin=False) return ProcessModelService.add_process_group(process_group) def create_process_group_with_api( self, client: FlaskClient, user: Any, process_group_id: str, display_name: str = "", ) -> str: process_group = ProcessGroup(id=process_group_id, display_name=display_name, display_order=0, admin=False) response = client.post( "/v1.0/process-groups", headers=self.logged_in_headers(user), content_type="application/json", data=json.dumps(ProcessGroupSchema().dump(process_group)), ) assert response.status_code == 201 assert response.json is not None assert response.json["id"] == process_group_id return process_group_id def create_process_model_with_api( self, client: FlaskClient, process_model_id: str | None = None, process_model_display_name: str = "Cooooookies", process_model_description: str = "Om nom nom delicious cookies", fault_or_suspend_on_exception: str = NotificationType.suspend.value, exception_notification_addresses: list | None = None, primary_process_id: str | None = None, primary_file_name: str | None = None, user: UserModel | None = None, ) -> TestResponse: if process_model_id is not None: # make sure we have a group process_group_id, _ = os.path.split(process_model_id) modified_process_group_id = process_group_id.replace("/", ":") process_group_path = os.path.abspath(os.path.join(FileSystemService.root_path(), process_group_id)) if ProcessModelService.is_process_group(process_group_path): if exception_notification_addresses is None: exception_notification_addresses = [] model = ProcessModelInfo( id=process_model_id, display_name=process_model_display_name, description=process_model_description, primary_process_id=primary_process_id, primary_file_name=primary_file_name, fault_or_suspend_on_exception=fault_or_suspend_on_exception, exception_notification_addresses=exception_notification_addresses, ) if user is None: user = self.find_or_create_user() response = client.post( f"/v1.0/process-models/{modified_process_group_id}", content_type="application/json", data=json.dumps(ProcessModelInfoSchema().dump(model)), headers=self.logged_in_headers(user), ) assert response.status_code == 201 return response else: raise Exception("You must create the group first") else: raise Exception("You must include the process_model_id, which must be a path to the model") def get_test_data_file_full_path(self, file_name: str, process_model_test_data_dir: str) -> str: return os.path.join( current_app.instance_path, "..", "..", "tests", "data", process_model_test_data_dir, file_name, ) def get_test_data_file_contents(self, file_name: str, process_model_test_data_dir: str) -> bytes: file_full_path = self.get_test_data_file_full_path(file_name, process_model_test_data_dir) with open(file_full_path, "rb") as file: return file.read() def create_spec_file( self, client: FlaskClient, process_model_id: str, process_model_location: str | None = None, process_model: ProcessModelInfo | None = None, file_name: str = "random_fact.svg", file_data: bytes = b"abcdef", user: UserModel | None = None, ) -> Any: """Test_create_spec_file. Adds a bpmn file to the model. process_model_id is the destination path process_model_location is the source path because of permissions, user might be required now..., not sure yet. """ if process_model_location is None: process_model_location = file_name.split(".")[0] if process_model is None: process_model = load_test_spec( process_model_id=process_model_id, bpmn_file_name=file_name, process_model_source_directory=process_model_location, ) data = {"file": (io.BytesIO(file_data), file_name)} if user is None: user = self.find_or_create_user() modified_process_model_id = process_model.id.replace("/", ":") response = client.post( f"/v1.0/process-models/{modified_process_model_id}/files", data=data, follow_redirects=True, content_type="multipart/form-data", headers=self.logged_in_headers(user), ) assert response.status_code == 201 assert response.get_data() is not None file = json.loads(response.get_data(as_text=True)) # assert FileType.svg.value == file["type"] # assert "image/svg+xml" == file["content_type"] response = client.get( f"/v1.0/process-models/{modified_process_model_id}/files/{file_name}", headers=self.logged_in_headers(user), ) assert response.status_code == 200 file2 = json.loads(response.get_data(as_text=True)) assert file["file_contents"] == file2["file_contents"] return file @staticmethod def create_process_instance_from_process_model_id_with_api( client: FlaskClient, test_process_model_id: str, headers: dict[str, str], ) -> TestResponse: """Create_process_instance. There must be an existing process model to instantiate. """ if not ProcessModelService.is_process_model_identifier(test_process_model_id): dirname = os.path.dirname(test_process_model_id) if not ProcessModelService.is_process_group_identifier(dirname): process_group = ProcessGroup(id=dirname, display_name=dirname) ProcessModelService.add_process_group(process_group) basename = os.path.basename(test_process_model_id) load_test_spec( process_model_id=test_process_model_id, process_model_source_directory=basename, bpmn_file_name=basename, ) modified_process_model_id = test_process_model_id.replace("/", ":") response = client.post( f"/v1.0/process-instances/{modified_process_model_id}", headers=headers, ) assert response.status_code == 201 return response # @staticmethod # def get_public_access_token(username: str, password: str) -> dict: # """Get_public_access_token.""" # public_access_token = AuthenticationService().get_public_access_token( # username, password # ) # return public_access_token def create_process_instance_from_process_model( self, process_model: ProcessModelInfo, status: str | None = "not_started", user: UserModel | None = None, ) -> ProcessInstanceModel: if user is None: user = self.find_or_create_user() current_time = round(time.time()) process_instance = ProcessInstanceModel( status=status, process_initiator=user, process_model_identifier=process_model.id, process_model_display_name=process_model.display_name, updated_at_in_seconds=round(time.time()), start_in_seconds=current_time - (3600 * 1), end_in_seconds=current_time - (3600 * 1 - 20), ) db.session.add(process_instance) db.session.commit() run_at_in_seconds = round(time.time()) ProcessInstanceQueueService.enqueue_new_process_instance(process_instance, run_at_in_seconds) return process_instance @classmethod def create_user_with_permission( cls, username: str, target_uri: str = PermissionTargetModel.URI_ALL, permission_names: list[str] | None = None, ) -> UserModel: user = BaseTest.find_or_create_user(username=username) return cls.add_permissions_to_user(user, target_uri=target_uri, permission_names=permission_names) @classmethod def add_permissions_to_user( cls, user: UserModel, target_uri: str = PermissionTargetModel.URI_ALL, permission_names: list[str] | None = None, ) -> UserModel: permission_target = AuthorizationService.find_or_create_permission_target(target_uri) if permission_names is None: permission_names = [member.name for member in Permission] for permission in permission_names: AuthorizationService.create_permission_for_principal( principal=user.principal, permission_target=permission_target, permission=permission, ) return user def assert_user_has_permission( self, user: UserModel, permission: str, target_uri: str, expected_result: bool = True, ) -> None: has_permission = AuthorizationService.user_has_permission( user=user, permission=permission, target_uri=target_uri, ) assert has_permission is expected_result def modify_process_identifier_for_path_param(self, identifier: str) -> str: return ProcessModelInfo.modify_process_identifier_for_path_param(identifier) def un_modify_modified_process_identifier_for_path_param(self, modified_identifier: str) -> str: return modified_identifier.replace(":", "/") def create_process_model_with_metadata(self) -> ProcessModelInfo: self.create_process_group("test_group", "test_group") process_model = load_test_spec( "test_group/hello_world", process_model_source_directory="nested-task-data-structure", ) ProcessModelService.update_process_model( process_model, { "metadata_extraction_paths": [ {"key": "awesome_var", "path": "outer.inner"}, {"key": "invoice_number", "path": "invoice_number"}, ] }, ) return process_model def post_to_process_instance_list( self, client: FlaskClient, user: UserModel, report_metadata: ReportMetadata | None = None, param_string: str | None = "", ) -> TestResponse: report_metadata_to_use = report_metadata if report_metadata_to_use is None: report_metadata_to_use = self.empty_report_metadata_body() response = client.post( f"/v1.0/process-instances{param_string}", headers=self.logged_in_headers(user), content_type="application/json", data=json.dumps({"report_metadata": report_metadata_to_use}), ) assert response.status_code == 200 assert response.json is not None return response def empty_report_metadata_body(self) -> ReportMetadata: return {"filter_by": [], "columns": [], "order_by": []} def start_sender_process( self, client: FlaskClient, payload: dict, group_name: str = "test_group", ) -> ProcessInstanceModel: process_model = load_test_spec( "test_group/message", process_model_source_directory="message_send_one_conversation", bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives ) process_instance = self.create_process_instance_from_process_model(process_model) processor_send_receive = ProcessInstanceProcessor(process_instance) processor_send_receive.do_engine_steps(save=True) task = processor_send_receive.get_all_user_tasks()[0] human_task = process_instance.active_human_tasks[0] ProcessInstanceService.complete_form_task( processor_send_receive, task, payload, process_instance.process_initiator, human_task, ) processor_send_receive.save() return process_instance def assure_a_message_was_sent(self, process_instance: ProcessInstanceModel, payload: dict) -> None: # There should be one new send message for the given process instance. send_messages = ( MessageInstanceModel.query.filter_by(message_type="send") .filter_by(process_instance_id=process_instance.id) .order_by(MessageInstanceModel.id) .all() ) assert len(send_messages) == 1 send_message = send_messages[0] assert send_message.payload == payload, "The send message should match up with the payload" assert send_message.name == "Request Approval" assert send_message.status == "ready" def assure_there_is_a_process_waiting_on_a_message(self, process_instance: ProcessInstanceModel) -> None: # There should be one new send message for the given process instance. waiting_messages = ( MessageInstanceModel.query.filter_by(message_type="receive") .filter_by(status="ready") .filter_by(process_instance_id=process_instance.id) .order_by(MessageInstanceModel.id) .all() ) assert len(waiting_messages) == 1 waiting_message = waiting_messages[0] self.assure_correlation_properties_are_right(waiting_message) def assure_correlation_properties_are_right(self, message: MessageInstanceModel) -> None: # Correlation Properties should match up po_curr = next(c for c in message.correlation_rules if c.name == "po_number") customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id") assert po_curr is not None assert customer_curr is not None