"""Test Process Api Blueprint.""" import io import json import time from typing import Any import pytest import requests from flask.app import Flask from flask.testing import FlaskClient from flask_bpmn.models.db import db from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import logged_in_headers from spiffworkflow_backend.exceptions.process_entity_not_found_error import ( ProcessEntityNotFoundError, ) from spiffworkflow_backend.models.process_group import ProcessGroup from spiffworkflow_backend.models.process_group import ProcessGroupSchema from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance_report import ( ProcessInstanceReportModel, ) from spiffworkflow_backend.models.process_model import NotificationType from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema from spiffworkflow_backend.models.task_event import TaskEventModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) from spiffworkflow_backend.services.process_model_service import ProcessModelService class TestProcessApi(BaseTest): """TestProcessAPi.""" def test_process_model_add( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_add_new_process_model.""" # group_id = None, model_id = "make_cookies" model_display_name = "Cooooookies" model_description = "Om nom nom delicious cookies" self.create_process_model( client, process_group_id=None, process_model_id=model_id, process_model_display_name=model_display_name, process_model_description=model_description, ) process_model = ProcessModelService().get_process_model(model_id) assert model_display_name == process_model.display_name assert 0 == process_model.display_order assert 1 == len(ProcessModelService().get_process_groups()) self.create_spec_file(client) def test_process_model_delete( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_delete.""" self.create_process_model(client) # assert we have a model process_model = ProcessModelService().get_process_model("make_cookies") assert process_model is not None assert process_model.id == "make_cookies" # delete the model user = self.find_or_create_user() response = client.delete( f"/v1.0/process-models/{process_model.process_group_id}/{process_model.id}", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert response.json["ok"] is True # assert we no longer have a model with pytest.raises(ProcessEntityNotFoundError): ProcessModelService().get_process_model("make_cookies") def test_process_model_delete_with_instances( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_delete_with_instances.""" test_process_group_id = "runs_without_input" test_process_model_id = "sample" user = self.find_or_create_user() headers = logged_in_headers(user) # create an instance from a model response = self.create_process_instance( client, test_process_group_id, test_process_model_id, headers ) data = json.loads(response.get_data(as_text=True)) # make sure the instance has the correct model assert data["process_model_identifier"] == test_process_model_id # try to delete the model response = client.delete( f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}", headers=logged_in_headers(user), ) # make sure we get an error in the response assert response.status_code == 400 data = json.loads(response.get_data(as_text=True)) assert data["code"] == "existing_instances" assert ( data["message"] == "We cannot delete the model `sample`, there are existing instances that depend on it." ) def test_process_model_update( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_update.""" self.create_process_model(client) process_model = ProcessModelService().get_process_model("make_cookies") assert process_model.id == "make_cookies" assert process_model.display_name == "Cooooookies" assert process_model.is_review is False process_model.display_name = "Updated Display Name" process_model.is_review = True user = self.find_or_create_user() response = client.put( f"/v1.0/process-models/{process_model.process_group_id}/{process_model.id}", headers=logged_in_headers(user), content_type="application/json", data=json.dumps(ProcessModelInfoSchema().dump(process_model)), ) assert response.status_code == 200 assert response.json is not None assert response.json["display_name"] == "Updated Display Name" assert response.json["is_review"] is False def test_process_model_list( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_list.""" # create a group group_id = "test_group" user = self.find_or_create_user() self.create_process_group(client, user, group_id) # add 5 models to the group for i in range(5): model_id = f"test_model_{i}" model_display_name = f"Test Model {i}" model_description = f"Test Model {i} Description" self.create_process_model( client, group_id, model_id, model_display_name, model_description ) # get all models response = client.get( f"/v1.0/process-groups/{group_id}/process-models", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 5 assert response.json["pagination"]["count"] == 5 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 1 # get first page, 1 per page response = client.get( f"/v1.0/process-groups/{group_id}/process-models?page=1&per_page=1", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 1 assert response.json["results"][0]["id"] == "test_model_0" assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 5 # get second page, 1 per page response = client.get( f"/v1.0/process-groups/{group_id}/process-models?page=2&per_page=1", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 1 assert response.json["results"][0]["id"] == "test_model_1" assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 5 # get first page, 3 per page response = client.get( f"/v1.0/process-groups/{group_id}/process-models?page=1&per_page=3", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 3 assert response.json["results"][0]["id"] == "test_model_0" assert response.json["pagination"]["count"] == 3 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 2 # get second page, 3 per page response = client.get( f"/v1.0/process-groups/{group_id}/process-models?page=2&per_page=3", headers=logged_in_headers(user), ) # there should only be 2 left assert response.json is not None assert len(response.json["results"]) == 2 assert response.json["results"][0]["id"] == "test_model_3" assert response.json["pagination"]["count"] == 2 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 2 def test_process_group_add( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_add_process_group.""" process_group = ProcessGroup( id="test", display_name="Another Test Category", display_order=0, admin=False, ) user = self.find_or_create_user() response = client.post( "/v1.0/process-groups", headers=logged_in_headers(user), content_type="application/json", data=json.dumps(ProcessGroupSchema().dump(process_group)), ) assert response.status_code == 201 # Check what is returned result = ProcessGroupSchema().loads(response.get_data(as_text=True)) assert result is not None assert result.display_name == "Another Test Category" assert result.id == "test" # Check what is persisted persisted = ProcessModelService().get_process_group("test") assert persisted.display_name == "Another Test Category" assert persisted.id == "test" def test_process_group_delete( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_group_delete.""" process_group_id = "test" process_group_display_name = "My Process Group" user = self.find_or_create_user() self.create_process_group( client, user, process_group_id, display_name=process_group_display_name ) persisted = ProcessModelService().get_process_group(process_group_id) assert persisted is not None assert persisted.id == process_group_id client.delete( f"/v1.0/process-groups/{process_group_id}", headers=logged_in_headers(user) ) with pytest.raises(ProcessEntityNotFoundError): ProcessModelService().get_process_group(process_group_id) def test_process_group_update( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test Process Group Update.""" group_id = "test_process_group" group_display_name = "Test Group" user = self.find_or_create_user() self.create_process_group( client, user, group_id, display_name=group_display_name ) process_group = ProcessModelService().get_process_group(group_id) assert process_group.display_name == group_display_name process_group.display_name = "Modified Display Name" response = client.put( f"/v1.0/process-groups/{group_id}", headers=logged_in_headers(user), content_type="application/json", data=json.dumps(ProcessGroupSchema().dump(process_group)), ) assert response.status_code == 200 process_group = ProcessModelService().get_process_group(group_id) assert process_group.display_name == "Modified Display Name" def test_process_group_list( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_group_list.""" # add 5 groups user = self.find_or_create_user() for i in range(5): group_id = f"test_process_group_{i}" group_display_name = f"Test Group {i}" self.create_process_group( client, user, group_id, display_name=group_display_name ) # get all groups response = client.get( "/v1.0/process-groups", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 5 assert response.json["pagination"]["count"] == 5 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 1 # get first page, one per page response = client.get( "/v1.0/process-groups?page=1&per_page=1", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 1 assert response.json["results"][0]["id"] == "test_process_group_0" assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 5 # get second page, one per page response = client.get( "/v1.0/process-groups?page=2&per_page=1", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 1 assert response.json["results"][0]["id"] == "test_process_group_1" assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 5 # get first page, 3 per page response = client.get( "/v1.0/process-groups?page=1&per_page=3", headers=logged_in_headers(user), ) assert response.json is not None assert len(response.json["results"]) == 3 assert response.json["results"][0]["id"] == "test_process_group_0" assert response.json["results"][1]["id"] == "test_process_group_1" assert response.json["results"][2]["id"] == "test_process_group_2" assert response.json["pagination"]["count"] == 3 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 2 # get second page, 3 per page response = client.get( "/v1.0/process-groups?page=2&per_page=3", headers=logged_in_headers(user), ) # there should only be 2 left assert response.json is not None assert len(response.json["results"]) == 2 assert response.json["results"][0]["id"] == "test_process_group_3" assert response.json["results"][1]["id"] == "test_process_group_4" assert response.json["pagination"]["count"] == 2 assert response.json["pagination"]["total"] == 5 assert response.json["pagination"]["pages"] == 2 def test_process_model_file_update_fails_if_no_file_given( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_file_update.""" self.create_spec_file(client) spec = load_test_spec("random_fact") data = {"key1": "THIS DATA"} user = self.find_or_create_user() response = client.put( f"/v1.0/process-models/{spec.process_group_id}/{spec.id}/file/random_fact.svg", data=data, follow_redirects=True, content_type="multipart/form-data", headers=logged_in_headers(user), ) assert response.status_code == 400 assert response.json is not None assert response.json["code"] == "no_file_given" def test_process_model_file_update_fails_if_contents_is_empty( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_file_update.""" self.create_spec_file(client) spec = load_test_spec("random_fact") data = {"file": (io.BytesIO(b""), "random_fact.svg")} user = self.find_or_create_user() response = client.put( f"/v1.0/process-models/{spec.process_group_id}/{spec.id}/file/random_fact.svg", data=data, follow_redirects=True, content_type="multipart/form-data", headers=logged_in_headers(user), ) assert response.status_code == 400 assert response.json is not None assert response.json["code"] == "file_contents_empty" def test_process_model_file_update( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_file_update.""" original_file = self.create_spec_file(client) spec = load_test_spec("random_fact") new_file_contents = b"THIS_IS_NEW_DATA" data = {"file": (io.BytesIO(new_file_contents), "random_fact.svg")} user = self.find_or_create_user() response = client.put( f"/v1.0/process-models/{spec.process_group_id}/{spec.id}/file/random_fact.svg", data=data, follow_redirects=True, content_type="multipart/form-data", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert response.json["ok"] response = client.get( f"/v1.0/process-models/{spec.process_group_id}/{spec.id}/file/random_fact.svg", headers=logged_in_headers(user), ) assert response.status_code == 200 updated_file = json.loads(response.get_data(as_text=True)) assert original_file != updated_file assert updated_file["file_contents"] == new_file_contents.decode() def test_get_file( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_get_file.""" user = self.find_or_create_user() test_process_group_id = "group_id1" process_model_dir_name = "hello_world" load_test_spec(process_model_dir_name, process_group_id=test_process_group_id) response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}/file/hello_world.bpmn", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert response.json["name"] == "hello_world.bpmn" assert response.json["process_group_id"] == "group_id1" assert response.json["process_model_id"] == "hello_world" def test_get_workflow_from_workflow_spec( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_get_workflow_from_workflow_spec.""" user = self.find_or_create_user() spec = load_test_spec("hello_world") response = client.post( f"/v1.0/process-models/{spec.process_group_id}/{spec.id}/process-instances", headers=logged_in_headers(user), ) assert response.status_code == 201 assert response.json is not None assert "hello_world" == response.json["process_model_identifier"] # assert('Task_GetName' == response.json['next_task']['name']) def test_get_process_groups_when_none( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_get_process_groups_when_none.""" user = self.find_or_create_user() response = client.get("/v1.0/process-groups", headers=logged_in_headers(user)) assert response.status_code == 200 assert response.json is not None assert response.json["results"] == [] def test_get_process_groups_when_there_are_some( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_get_process_groups_when_there_are_some.""" user = self.find_or_create_user() load_test_spec("hello_world") response = client.get("/v1.0/process-groups", headers=logged_in_headers(user)) assert response.status_code == 200 assert response.json is not None assert len(response.json["results"]) == 1 assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["total"] == 1 assert response.json["pagination"]["pages"] == 1 def test_get_process_group_when_found( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_get_process_group_when_found.""" user = self.find_or_create_user() test_process_group_id = "group_id1" process_model_dir_name = "hello_world" load_test_spec(process_model_dir_name, process_group_id=test_process_group_id) response = client.get( f"/v1.0/process-groups/{test_process_group_id}", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert response.json["id"] == test_process_group_id assert response.json["process_models"][0]["id"] == process_model_dir_name def test_get_process_model_when_found( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_get_process_model_when_found.""" user = self.find_or_create_user() test_process_group_id = "group_id1" process_model_dir_name = "hello_world" load_test_spec(process_model_dir_name, process_group_id=test_process_group_id) response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert response.json["id"] == process_model_dir_name assert len(response.json["files"]) == 1 assert response.json["files"][0]["name"] == "hello_world.bpmn" def test_get_process_model_when_not_found( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_get_process_model_when_not_found.""" user = self.find_or_create_user() process_model_dir_name = "THIS_NO_EXISTS" group_id = self.create_process_group(client, user, "my_group") response = client.get( f"/v1.0/process-models/{group_id}/{process_model_dir_name}", headers=logged_in_headers(user), ) assert response.status_code == 400 assert response.json is not None assert response.json["code"] == "process_model_cannot_be_found" def test_process_instance_create( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_create.""" test_process_group_id = "runs_without_input" test_process_model_id = "sample" user = self.find_or_create_user() headers = logged_in_headers(user) response = self.create_process_instance( client, test_process_group_id, test_process_model_id, headers ) assert response.json is not None assert response.json["updated_at_in_seconds"] is not None assert response.json["status"] == "not_started" assert response.json["process_model_identifier"] == test_process_model_id def test_process_instance_run( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_run.""" process_group_id = "runs_without_input" process_model_id = "sample" user = self.find_or_create_user() headers = logged_in_headers(user) response = self.create_process_instance( client, process_group_id, process_model_id, headers ) assert response.json is not None process_instance_id = response.json["id"] response = client.post( f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.json is not None assert type(response.json["updated_at_in_seconds"]) is int assert response.json["updated_at_in_seconds"] > 0 assert response.json["status"] == "complete" assert response.json["process_model_identifier"] == process_model_id assert response.json["data"]["current_user"]["username"] == "test_user1" assert response.json["data"]["Mike"] == "Awesome" assert response.json["data"]["person"] == "Kevin" def test_message_start_when_starting_process_instance( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_message_start_when_starting_process_instance.""" # ensure process model is loaded in db load_test_spec( "message_receiver", process_model_source_directory="message_send_one_conversation", bpmn_file_name="message_receiver", ) user = self.find_or_create_user() message_model_identifier = "message_send" payload = { "topica": "the_topica_string", "topicb": "the_topicb_string", "andThis": "another_item_non_key", } response = client.post( f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=logged_in_headers(user), data=json.dumps({"payload": payload}), ) assert response.status_code == 200 json_data = response.json assert json_data assert json_data["status"] == "complete" process_instance_id = json_data["id"] process_instance = ProcessInstanceModel.query.filter_by( id=process_instance_id ).first() assert process_instance processor = ProcessInstanceProcessor(process_instance) process_instance_data = processor.get_data() assert process_instance_data assert process_instance_data["the_payload"] == payload def test_message_start_when_providing_message_to_running_process_instance( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_message_start_when_providing_message_to_running_process_instance.""" process_model = load_test_spec( "message_sender", process_model_source_directory="message_send_one_conversation", bpmn_file_name="message_sender", ) user = self.find_or_create_user() message_model_identifier = "message_response" payload = { "the_payload": { "topica": "the_payload.topica_string", "topicb": "the_payload.topicb_string", "andThis": "another_item_non_key", } } response = self.create_process_instance( client, process_model.process_group_id, process_model.id, logged_in_headers(user), ) assert response.json is not None process_instance_id = response.json["id"] response = client.post( f"/v1.0/process-models/{process_model.process_group_id}/" f"{process_model.id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.json is not None response = client.post( f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=logged_in_headers(user), data=json.dumps( {"payload": payload, "process_instance_id": process_instance_id} ), ) assert response.status_code == 200 json_data = response.json assert json_data assert json_data["status"] == "complete" process_instance_id = json_data["id"] process_instance = ProcessInstanceModel.query.filter_by( id=process_instance_id ).first() assert process_instance processor = ProcessInstanceProcessor(process_instance) process_instance_data = processor.get_data() assert process_instance_data assert process_instance_data["the_payload"] == payload def test_process_instance_can_be_terminated( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_message_start_when_providing_message_to_running_process_instance.""" # this task will wait on a catch event process_model = load_test_spec( "message_sender", process_model_source_directory="message_send_one_conversation", bpmn_file_name="message_sender", ) user = self.find_or_create_user() response = self.create_process_instance( client, process_model.process_group_id, process_model.id, logged_in_headers(user), ) assert response.json is not None process_instance_id = response.json["id"] response = client.post( f"/v1.0/process-models/{process_model.process_group_id}/" f"{process_model.id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None response = client.post( f"/v1.0/process-models/{process_model.process_group_id}/" f"{process_model.id}/process-instances/{process_instance_id}/terminate", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None process_instance = ProcessInstanceModel.query.filter_by( id=process_instance_id ).first() assert process_instance assert process_instance.status == "terminated" def test_process_instance_delete( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_delete.""" process_group_id = "my_process_group" process_model_id = "user_task" user = self.find_or_create_user() headers = logged_in_headers(user) response = self.create_process_instance( client, process_group_id, process_model_id, headers ) assert response.json is not None process_instance_id = response.json["id"] response = client.post( f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.json is not None task_events = ( db.session.query(TaskEventModel) .filter(TaskEventModel.process_instance_id == process_instance_id) .all() ) assert len(task_events) == 1 task_event = task_events[0] assert task_event.user_id == user.id delete_response = client.delete( f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}", headers=logged_in_headers(user), ) assert delete_response.status_code == 200 def test_process_instance_run_user_task_creates_task_event( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_run_user_task.""" process_group_id = "my_process_group" process_model_id = "user_task" user = self.find_or_create_user() headers = logged_in_headers(user) response = self.create_process_instance( client, process_group_id, process_model_id, headers ) assert response.json is not None process_instance_id = response.json["id"] response = client.post( f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.json is not None task_events = ( db.session.query(TaskEventModel) .filter(TaskEventModel.process_instance_id == process_instance_id) .all() ) assert len(task_events) == 1 task_event = task_events[0] assert task_event.user_id == user.id # TODO: When user tasks work, we need to add some more assertions for action, task_state, etc. def test_process_instance_list_with_default_list( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_list_with_default_list.""" test_process_group_id = "runs_without_input" process_model_dir_name = "sample" user = self.find_or_create_user() headers = logged_in_headers(user) self.create_process_instance( client, test_process_group_id, process_model_dir_name, headers ) response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}/process-instances", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert len(response.json["results"]) == 1 assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["pages"] == 1 assert response.json["pagination"]["total"] == 1 process_instance_dict = response.json["results"][0] assert type(process_instance_dict["id"]) is int assert ( process_instance_dict["process_model_identifier"] == process_model_dir_name ) assert ( process_instance_dict["process_group_identifier"] == test_process_group_id ) assert type(process_instance_dict["start_in_seconds"]) is int assert process_instance_dict["start_in_seconds"] > 0 assert process_instance_dict["end_in_seconds"] is None assert process_instance_dict["status"] == "not_started" def test_process_instance_list_with_paginated_items( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_list_with_paginated_items.""" test_process_group_id = "runs_without_input" process_model_dir_name = "sample" user = self.find_or_create_user() headers = logged_in_headers(user) self.create_process_instance( client, test_process_group_id, process_model_dir_name, headers ) self.create_process_instance( client, test_process_group_id, process_model_dir_name, headers ) self.create_process_instance( client, test_process_group_id, process_model_dir_name, headers ) self.create_process_instance( client, test_process_group_id, process_model_dir_name, headers ) self.create_process_instance( client, test_process_group_id, process_model_dir_name, headers ) response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}/process-instances?per_page=2&page=3", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert len(response.json["results"]) == 1 assert response.json["pagination"]["count"] == 1 assert response.json["pagination"]["pages"] == 3 assert response.json["pagination"]["total"] == 5 response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}/process-instances?per_page=2&page=1", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert len(response.json["results"]) == 2 assert response.json["pagination"]["count"] == 2 assert response.json["pagination"]["pages"] == 3 assert response.json["pagination"]["total"] == 5 def test_process_instance_list_filter( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_list_filter.""" test_process_group_id = "runs_without_input" test_process_model_id = "sample" user = self.find_or_create_user() load_test_spec(test_process_model_id, process_group_id=test_process_group_id) statuses = [status.value for status in ProcessInstanceStatus] # create 5 instances with different status, and different start_in_seconds/end_in_seconds for i in range(5): process_instance = ProcessInstanceModel( status=ProcessInstanceStatus[statuses[i]].value, process_initiator=user, process_model_identifier=test_process_model_id, process_group_identifier=test_process_group_id, updated_at_in_seconds=round(time.time()), start_in_seconds=(1000 * i) + 1000, end_in_seconds=(1000 * i) + 2000, bpmn_json=json.dumps({"i": i}), ) db.session.add(process_instance) db.session.commit() # Without filtering we should get all 5 instances response = client.get( f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}/process-instances", headers=logged_in_headers(user), ) assert response.json is not None results = response.json["results"] assert len(results) == 5 # filter for each of the status # we should get 1 instance each time for i in range(5): response = client.get( f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}/process-instances?process_status={ProcessInstanceStatus[statuses[i]].value}", headers=logged_in_headers(user), ) assert response.json is not None results = response.json["results"] assert len(results) == 1 assert results[0]["status"] == ProcessInstanceStatus[statuses[i]].value # filter by start/end seconds # start > 1000 - this should eliminate the first response = client.get( f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}/process-instances?start_from=1001", headers=logged_in_headers(user), ) assert response.json is not None results = response.json["results"] assert len(results) == 4 for i in range(4): assert json.loads(results[i]["bpmn_json"])["i"] in (1, 2, 3, 4) # start > 2000, end < 5000 - this should eliminate the first 2 and the last response = client.get( f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}/process-instances?start_from=2001&end_till=5999", headers=logged_in_headers(user), ) assert response.json is not None results = response.json["results"] assert len(results) == 2 assert json.loads(results[0]["bpmn_json"])["i"] in (2, 3) assert json.loads(results[1]["bpmn_json"])["i"] in (2, 3) # start > 1000, start < 4000 - this should eliminate the first and the last 2 response = client.get( f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}/process-instances?start_from=1001&start_till=3999", headers=logged_in_headers(user), ) assert response.json is not None results = response.json["results"] assert len(results) == 2 assert json.loads(results[0]["bpmn_json"])["i"] in (1, 2) assert json.loads(results[1]["bpmn_json"])["i"] in (1, 2) # end > 2000, end < 6000 - this should eliminate the first and the last response = client.get( f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}/process-instances?end_from=2001&end_till=5999", headers=logged_in_headers(user), ) assert response.json is not None results = response.json["results"] assert len(results) == 3 for i in range(3): assert json.loads(results[i]["bpmn_json"])["i"] in (1, 2, 3) def test_process_instance_report_list( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_instance_report_list.""" process_group_identifier = "runs_without_input" process_model_identifier = "sample" user = self.find_or_create_user() logged_in_headers(user) load_test_spec( process_model_identifier, process_group_id=process_group_identifier ) report_identifier = "testreport" report_metadata = {"order_by": ["month"]} ProcessInstanceReportModel.create_with_attributes( identifier=report_identifier, process_group_identifier=process_group_identifier, process_model_identifier=process_model_identifier, report_metadata=report_metadata, user=user, ) response = client.get( f"/v1.0/process-models/{process_group_identifier}/{process_model_identifier}/process-instances/reports", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert len(response.json) == 1 assert response.json[0]["identifier"] == report_identifier assert response.json[0]["report_metadata"]["order_by"] == ["month"] def test_process_instance_report_show_with_default_list( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, setup_process_instances_for_reports: list[ProcessInstanceModel], ) -> None: """Test_process_instance_report_show_with_default_list.""" test_process_group_id = "runs_without_input" process_model_dir_name = "sample" user = self.find_or_create_user() report_metadata = { "columns": [ {"Header": "id", "accessor": "id"}, { "Header": "process_model_identifier", "accessor": "process_model_identifier", }, {"Header": "process_group_id", "accessor": "process_group_identifier"}, {"Header": "start_in_seconds", "accessor": "start_in_seconds"}, {"Header": "status", "accessor": "status"}, {"Header": "Name", "accessor": "name"}, {"Header": "Status", "accessor": "status"}, ], "order_by": ["test_score"], "filter_by": [ {"field_name": "grade_level", "operator": "equals", "field_value": 2} ], } ProcessInstanceReportModel.create_with_attributes( identifier="sure", process_group_identifier=test_process_group_id, process_model_identifier=process_model_dir_name, report_metadata=report_metadata, user=self.find_or_create_user(), ) response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}/process-instances/reports/sure", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert len(response.json["results"]) == 2 assert response.json["pagination"]["count"] == 2 assert response.json["pagination"]["pages"] == 1 assert response.json["pagination"]["total"] == 2 process_instance_dict = response.json["results"][0] assert type(process_instance_dict["id"]) is int assert ( process_instance_dict["process_model_identifier"] == process_model_dir_name ) assert ( process_instance_dict["process_group_identifier"] == test_process_group_id ) assert type(process_instance_dict["start_in_seconds"]) is int assert process_instance_dict["start_in_seconds"] > 0 assert process_instance_dict["status"] == "complete" def test_process_instance_report_show_with_dynamic_filter_and_query_param( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, setup_process_instances_for_reports: list[ProcessInstanceModel], ) -> None: """Test_process_instance_report_show_with_default_list.""" test_process_group_id = "runs_without_input" process_model_dir_name = "sample" user = self.find_or_create_user() report_metadata = { "filter_by": [ { "field_name": "grade_level", "operator": "equals", "field_value": "{{grade_level}}", } ], } ProcessInstanceReportModel.create_with_attributes( identifier="sure", process_group_identifier=test_process_group_id, process_model_identifier=process_model_dir_name, report_metadata=report_metadata, user=self.find_or_create_user(), ) response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}/process-instances/reports/sure?grade_level=1", headers=logged_in_headers(user), ) assert response.status_code == 200 assert response.json is not None assert len(response.json["results"]) == 1 def test_process_instance_report_show_with_bad_identifier( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, setup_process_instances_for_reports: list[ProcessInstanceModel], ) -> None: """Test_process_instance_report_show_with_default_list.""" test_process_group_id = "runs_without_input" process_model_dir_name = "sample" user = self.find_or_create_user() response = client.get( f"/v1.0/process-models/{test_process_group_id}/{process_model_dir_name}/process-instances/reports/sure?grade_level=1", headers=logged_in_headers(user), ) assert response.status_code == 404 data = json.loads(response.get_data(as_text=True)) assert data["code"] == "unknown_process_instance_report" def setup_testing_instance( self, client: FlaskClient, process_group_id: str, process_model_id: str, user: UserModel, ) -> Any: """Setup_testing_instance.""" headers = logged_in_headers(user) response = self.create_process_instance( client, process_group_id, process_model_id, headers ) process_instance = response.json assert isinstance(process_instance, dict) process_instance_id = process_instance["id"] return process_instance_id def test_error_handler( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_error_handler.""" process_group_id = "data" process_model_id = "error" user = self.find_or_create_user() process_instance_id = self.setup_testing_instance( client, process_group_id, process_model_id, user ) process = ( db.session.query(ProcessInstanceModel) .filter(ProcessInstanceModel.id == process_instance_id) .first() ) assert process.status == "not_started" response = client.post( f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.status_code == 400 api_error = json.loads(response.get_data(as_text=True)) assert api_error["code"] == "unknown_exception" assert "An unknown error occurred." in api_error["message"] assert ( 'Original error: ApiError: Activity_CauseError: TypeError:can only concatenate str (not "int") to str.' in api_error["message"] ) assert ( "Error in task 'Cause Error' (Activity_CauseError)." in api_error["message"] ) assert "Error is on line 1. In file error.bpmn." in api_error["message"] process = ( db.session.query(ProcessInstanceModel) .filter(ProcessInstanceModel.id == process_instance_id) .first() ) assert process.status == "faulted" def test_error_handler_suspend( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_error_handler_suspend.""" process_group_id = "data" process_model_id = "error" user = self.find_or_create_user() process_instance_id = self.setup_testing_instance( client, process_group_id, process_model_id, user ) process_model = ProcessModelService().get_process_model( process_model_id, process_group_id ) ProcessModelService().update_spec( process_model, {"fault_or_suspend_on_exception": NotificationType.suspend.value}, ) process = ( db.session.query(ProcessInstanceModel) .filter(ProcessInstanceModel.id == process_instance_id) .first() ) assert process.status == "not_started" response = client.post( f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.status_code == 400 process = ( db.session.query(ProcessInstanceModel) .filter(ProcessInstanceModel.id == process_instance_id) .first() ) assert process.status == "suspended" def test_error_handler_with_email( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_error_handler.""" process_group_id = "data" process_model_id = "error" user = self.find_or_create_user() process_instance_id = self.setup_testing_instance( client, process_group_id, process_model_id, user ) process_model = ProcessModelService().get_process_model( process_model_id, process_group_id ) ProcessModelService().update_spec( process_model, {"exception_notification_addresses": ["user@example.com"]} ) mail = app.config["MAIL_APP"] with mail.record_messages() as outbox: response = client.post( f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run", headers=logged_in_headers(user), ) assert response.status_code == 400 assert len(outbox) == 1 message = outbox[0] assert message.subject == "Unexpected error in app" assert ( message.body == 'Activity_CauseError: TypeError:can only concatenate str (not "int") to str' ) assert message.recipients == process_model.exception_notification_addresses process = ( db.session.query(ProcessInstanceModel) .filter(ProcessInstanceModel.id == process_instance_id) .first() ) assert process.status == "faulted" def test_process_model_file_create( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None ) -> None: """Test_process_model_file_create.""" process_group_id = "hello_world" process_model_id = "hello_world" file_name = "hello_world.svg" file_data = b"abc123" result = self.create_spec_file( client, process_group_id=process_group_id, process_model_id=process_model_id, file_name=file_name, file_data=file_data, ) assert result["process_group_id"] == process_group_id assert result["process_model_id"] == process_model_id assert result["name"] == file_name assert bytes(str(result["file_contents"]), "utf-8") == file_data # def test_get_process_model(self): # # load_test_spec('random_fact') # response = client.get('/v1.0/workflow-specification/random_fact', headers=logged_in_headers()) # assert_success(response) # json_data = json.loads(response.get_data(as_text=True)) # api_spec = WorkflowSpecInfoSchema().load(json_data) # # fs_spec = process_model_service.get_spec('random_fact') # assert(WorkflowSpecInfoSchema().dump(fs_spec) == json_data) # def test_waku_debug_info(self) -> None: """Test_waku_debug_info.""" debug_info_method = "get_waku_v2_debug_v1_info" headers = {"Content-Type": "application/json"} rpc_json = { "jsonrpc": "2.0", "method": debug_info_method, "params": [], "id": "id", } request_url = "http://localhost:8545" rpc_response = requests.post(request_url, headers=headers, json=rpc_json) rpc_json_text: dict = json.loads(rpc_response.text) assert isinstance(rpc_json_text, dict) # assert 'jsonrpc' in rpc_json_text # assert rpc_json_text['jsonrpc'] == '2.0' assert "result" in rpc_json_text result = rpc_json_text["result"] assert isinstance(result, dict) assert "listenAddresses" in result assert "enrUri" in result print("test_call_waku") def test_send_message(self) -> None: """Test_send_message.""" relay_message_method = "post_waku_v2_relay_v1_message" headers = {"Content-Type": "application/json"} # class WakuMessage: # payload: str # contentTopic: str # Optional # # version: int # Optional # timestamp: int # Optional payload = "This is my message" contentTopic = "myTestTopic" # noqa: N806 timestamp = time.time() waku_relay_message = { "payload": payload, "contentTopic": contentTopic, "timestamp": timestamp, } # ["", [{"contentTopic":"/waku/2/default-content/proto"}]] params = ["/waku/2/default-waku/proto", {"message": waku_relay_message}] rpc_json = { "jsonrpc": "2.0", "method": relay_message_method, "params": params, "id": 1, } request_url = "http://localhost:8545" rpc_response = requests.post(request_url, headers=headers, json=rpc_json) assert rpc_response.status_code == 200 rpc_json_data: dict = json.loads(rpc_response.text) assert "error" in rpc_json_data assert "result" in rpc_json_data assert rpc_json_data["error"] is None assert rpc_json_data["result"] is True print("test_send_message") def test_get_waku_messages(self) -> None: """Test_get_waku_messages.""" method = "get_waku_v2_store_v1_messages" headers = {"Content-Type": "application/json"} params = [{"contentTopic": "/waku/2/default-content/proto"}] rpc_json = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1} request_url = "http://localhost:8545" rpc_response = requests.post(request_url, headers=headers, json=rpc_json) assert rpc_response.status_code == 200 rpc_json_data: dict = json.loads(rpc_response.text) assert "error" in rpc_json_data assert rpc_json_data["error"] is None assert "result" in rpc_json_data assert isinstance(rpc_json_data["result"], dict) assert "messages" in rpc_json_data["result"] assert "pagingInfo" in rpc_json_data["result"] print("get_waku_messages")