mirror of
https://github.com/status-im/spiff-arena.git
synced 2025-01-12 03:04:12 +00:00
9bb9ce47f8
* WIP: some updates to support new spiff w/ burnettk * unit tests are passing * all tests except message tests are passing * fixed usage of catch message event w/ burnettk * messages are working again w/ burnettk * uncommented remaining message tests w/ burnettk * fixed cypress tests w/ burnettk * use main for spiffworkflow * translated mysql last milestone query to sqlalchemy w/ burnettk * fixed last milestone query so instances still return if no milestone found and moved some code from the main report method to own methods * added some comments * added last milestone column to process instances table * display last milestone in instance list table w/ burnettk * remove 3 characters when truncating last milestone for ellipsis * make sure we have a current processor so we don't return null * remove sleep * The background processor now only picks up processes that were last updated more than a minute ago to avoid conflicting with the interstitial page. With the understanding that we can rmeove this limitation when we can refactor to allow the backend processes to provide updates on what they are doing. * pyl w/ burnettk * cache last milestone on instances * pyl * added test for last milestone and added it to the proces instance show page w/ burnettk * fixed broken test w/ burnettk * fixed last milestone header * removed duplicated column * fixed broken test --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com> Co-authored-by: Kevin Burnett <18027+burnettk@users.noreply.github.com> Co-authored-by: danfunk <daniel.h.funk@gmail.com> Co-authored-by: burnettk <burnettk@users.noreply.github.com>
519 lines
21 KiB
Python
519 lines
21 KiB
Python
import io
|
|
import json
|
|
import os
|
|
import time
|
|
from collections.abc import Generator
|
|
from contextlib import contextmanager
|
|
from typing import Any
|
|
|
|
from flask import current_app
|
|
from flask.app import Flask
|
|
from flask.testing import FlaskClient
|
|
from spiffworkflow_backend.exceptions.api_error import ApiError
|
|
from spiffworkflow_backend.models.db import db
|
|
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
|
from spiffworkflow_backend.models.permission_assignment import Permission
|
|
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
|
from spiffworkflow_backend.models.process_group import ProcessGroup
|
|
from spiffworkflow_backend.models.process_group import ProcessGroupSchema
|
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
|
from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel
|
|
from spiffworkflow_backend.models.process_instance_report import ProcessInstanceReportModel
|
|
from spiffworkflow_backend.models.process_instance_report import ReportMetadata
|
|
from spiffworkflow_backend.models.process_model import NotificationType
|
|
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
|
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
|
|
from spiffworkflow_backend.models.user import UserModel
|
|
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
|
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
|
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
|
|
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
|
|
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
|
|
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
|
from spiffworkflow_backend.services.user_service import UserService
|
|
from werkzeug.test import TestResponse # type: ignore
|
|
|
|
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
|
|
|
# from tests.spiffworkflow_backend.helpers.test_data import logged_in_headers
|
|
|
|
|
|
class BaseTest:
|
|
@staticmethod
|
|
def find_or_create_user(username: str = "test_user_1") -> UserModel:
|
|
user = UserModel.query.filter_by(username=username).first()
|
|
if isinstance(user, UserModel):
|
|
return user
|
|
|
|
user = UserService.create_user(username, "internal", username)
|
|
if isinstance(user, UserModel):
|
|
return user
|
|
|
|
raise ApiError(
|
|
error_code="create_user_error",
|
|
message=f"Cannot find or create user: {username}",
|
|
)
|
|
|
|
@staticmethod
|
|
def logged_in_headers(user: UserModel, _redirect_url: str = "http://some/frontend/url") -> dict[str, str]:
|
|
return {"Authorization": "Bearer " + user.encode_auth_token()}
|
|
|
|
def create_group_and_model_with_bpmn(
|
|
self,
|
|
client: FlaskClient,
|
|
user: UserModel,
|
|
process_group_id: str | None = "test_group",
|
|
process_model_id: str | None = "random_fact",
|
|
bpmn_file_name: str | None = None,
|
|
bpmn_file_location: str | None = None,
|
|
) -> ProcessModelInfo:
|
|
"""Creates a process group.
|
|
|
|
Creates a process model
|
|
Adds a bpmn file to the model.
|
|
"""
|
|
process_group_display_name = process_group_id or ""
|
|
process_group_description = process_group_id or ""
|
|
process_model_identifier = f"{process_group_id}/{process_model_id}"
|
|
if bpmn_file_location is None:
|
|
bpmn_file_location = process_model_id
|
|
|
|
self.create_process_group_with_api(client, user, process_group_description, process_group_display_name)
|
|
|
|
self.create_process_model_with_api(
|
|
client,
|
|
process_model_id=process_model_identifier,
|
|
process_model_display_name=process_group_display_name,
|
|
process_model_description=process_group_description,
|
|
user=user,
|
|
)
|
|
|
|
process_model = load_test_spec(
|
|
process_model_id=process_model_identifier,
|
|
bpmn_file_name=bpmn_file_name,
|
|
process_model_source_directory=bpmn_file_location,
|
|
)
|
|
|
|
return process_model
|
|
|
|
def create_process_group(
|
|
self,
|
|
process_group_id: str,
|
|
display_name: str = "",
|
|
) -> ProcessGroup:
|
|
process_group = ProcessGroup(id=process_group_id, display_name=display_name, display_order=0, admin=False)
|
|
return ProcessModelService.add_process_group(process_group)
|
|
|
|
def create_process_group_with_api(
|
|
self,
|
|
client: FlaskClient,
|
|
user: Any,
|
|
process_group_id: str,
|
|
display_name: str = "",
|
|
) -> str:
|
|
process_group = ProcessGroup(id=process_group_id, display_name=display_name, display_order=0, admin=False)
|
|
response = client.post(
|
|
"/v1.0/process-groups",
|
|
headers=self.logged_in_headers(user),
|
|
content_type="application/json",
|
|
data=json.dumps(ProcessGroupSchema().dump(process_group)),
|
|
)
|
|
assert response.status_code == 201
|
|
assert response.json is not None
|
|
assert response.json["id"] == process_group_id
|
|
return process_group_id
|
|
|
|
def create_process_model_with_api(
|
|
self,
|
|
client: FlaskClient,
|
|
process_model_id: str | None = None,
|
|
process_model_display_name: str = "Cooooookies",
|
|
process_model_description: str = "Om nom nom delicious cookies",
|
|
fault_or_suspend_on_exception: str = NotificationType.suspend.value,
|
|
exception_notification_addresses: list | None = None,
|
|
primary_process_id: str | None = None,
|
|
primary_file_name: str | None = None,
|
|
user: UserModel | None = None,
|
|
) -> TestResponse:
|
|
if process_model_id is not None:
|
|
# make sure we have a group
|
|
process_group_id, _ = os.path.split(process_model_id)
|
|
modified_process_group_id = process_group_id.replace("/", ":")
|
|
process_group_path = os.path.abspath(os.path.join(FileSystemService.root_path(), process_group_id))
|
|
if ProcessModelService.is_process_group(process_group_path):
|
|
if exception_notification_addresses is None:
|
|
exception_notification_addresses = []
|
|
|
|
model = ProcessModelInfo(
|
|
id=process_model_id,
|
|
display_name=process_model_display_name,
|
|
description=process_model_description,
|
|
primary_process_id=primary_process_id,
|
|
primary_file_name=primary_file_name,
|
|
fault_or_suspend_on_exception=fault_or_suspend_on_exception,
|
|
exception_notification_addresses=exception_notification_addresses,
|
|
)
|
|
if user is None:
|
|
user = self.find_or_create_user()
|
|
|
|
response = client.post(
|
|
f"/v1.0/process-models/{modified_process_group_id}",
|
|
content_type="application/json",
|
|
data=json.dumps(ProcessModelInfoSchema().dump(model)),
|
|
headers=self.logged_in_headers(user),
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
return response
|
|
|
|
else:
|
|
raise Exception("You must create the group first")
|
|
else:
|
|
raise Exception("You must include the process_model_id, which must be a path to the model")
|
|
|
|
def get_test_data_file_full_path(self, file_name: str, process_model_test_data_dir: str) -> str:
|
|
return os.path.join(
|
|
current_app.instance_path,
|
|
"..",
|
|
"..",
|
|
"tests",
|
|
"data",
|
|
process_model_test_data_dir,
|
|
file_name,
|
|
)
|
|
|
|
def get_test_data_file_contents(self, file_name: str, process_model_test_data_dir: str) -> bytes:
|
|
file_full_path = self.get_test_data_file_full_path(file_name, process_model_test_data_dir)
|
|
with open(file_full_path, "rb") as file:
|
|
return file.read()
|
|
|
|
def create_spec_file(
|
|
self,
|
|
client: FlaskClient,
|
|
process_model_id: str,
|
|
process_model_location: str | None = None,
|
|
process_model: ProcessModelInfo | None = None,
|
|
file_name: str = "random_fact.bpmn",
|
|
file_data: bytes = b"abcdef",
|
|
user: UserModel | None = None,
|
|
) -> Any:
|
|
"""Test_create_spec_file.
|
|
|
|
Adds a bpmn file to the model.
|
|
process_model_id is the destination path
|
|
process_model_location is the source path
|
|
|
|
because of permissions, user might be required now..., not sure yet.
|
|
"""
|
|
if process_model_location is None:
|
|
process_model_location = file_name.split(".")[0]
|
|
if process_model is None:
|
|
process_model = load_test_spec(
|
|
process_model_id=process_model_id,
|
|
bpmn_file_name=file_name,
|
|
process_model_source_directory=process_model_location,
|
|
)
|
|
data = {"file": (io.BytesIO(file_data), file_name)}
|
|
if user is None:
|
|
user = self.find_or_create_user()
|
|
modified_process_model_id = process_model.id.replace("/", ":")
|
|
response = client.post(
|
|
f"/v1.0/process-models/{modified_process_model_id}/files",
|
|
data=data,
|
|
follow_redirects=True,
|
|
content_type="multipart/form-data",
|
|
headers=self.logged_in_headers(user),
|
|
)
|
|
assert response.status_code == 201
|
|
assert response.get_data() is not None
|
|
file = json.loads(response.get_data(as_text=True))
|
|
# assert FileType.svg.value == file["type"]
|
|
# assert "image/svg+xml" == file["content_type"]
|
|
|
|
response = client.get(
|
|
f"/v1.0/process-models/{modified_process_model_id}/files/{file_name}",
|
|
headers=self.logged_in_headers(user),
|
|
)
|
|
assert response.status_code == 200
|
|
file2 = json.loads(response.get_data(as_text=True))
|
|
assert file["file_contents"] == file2["file_contents"]
|
|
return file
|
|
|
|
@staticmethod
|
|
def create_process_instance_from_process_model_id_with_api(
|
|
client: FlaskClient,
|
|
test_process_model_id: str,
|
|
headers: dict[str, str],
|
|
) -> TestResponse:
|
|
"""Create_process_instance.
|
|
|
|
There must be an existing process model to instantiate.
|
|
"""
|
|
if not ProcessModelService.is_process_model_identifier(test_process_model_id):
|
|
dirname = os.path.dirname(test_process_model_id)
|
|
if not ProcessModelService.is_process_group_identifier(dirname):
|
|
process_group = ProcessGroup(id=dirname, display_name=dirname)
|
|
ProcessModelService.add_process_group(process_group)
|
|
basename = os.path.basename(test_process_model_id)
|
|
load_test_spec(
|
|
process_model_id=test_process_model_id,
|
|
process_model_source_directory=basename,
|
|
bpmn_file_name=basename,
|
|
)
|
|
modified_process_model_id = test_process_model_id.replace("/", ":")
|
|
response = client.post(
|
|
f"/v1.0/process-instances/{modified_process_model_id}",
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 201
|
|
return response
|
|
|
|
# @staticmethod
|
|
# def get_public_access_token(username: str, password: str) -> dict:
|
|
# """Get_public_access_token."""
|
|
# public_access_token = AuthenticationService().get_public_access_token(
|
|
# username, password
|
|
# )
|
|
# return public_access_token
|
|
|
|
def create_process_instance_from_process_model(
|
|
self,
|
|
process_model: ProcessModelInfo,
|
|
status: str | None = "not_started",
|
|
user: UserModel | None = None,
|
|
) -> ProcessInstanceModel:
|
|
if user is None:
|
|
user = self.find_or_create_user()
|
|
|
|
current_time = round(time.time())
|
|
process_instance = ProcessInstanceModel(
|
|
status=status,
|
|
process_initiator=user,
|
|
process_model_identifier=process_model.id,
|
|
process_model_display_name=process_model.display_name,
|
|
updated_at_in_seconds=round(time.time()),
|
|
start_in_seconds=current_time - (3600 * 1),
|
|
end_in_seconds=current_time - (3600 * 1 - 20),
|
|
)
|
|
db.session.add(process_instance)
|
|
db.session.commit()
|
|
|
|
run_at_in_seconds = round(time.time())
|
|
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance, run_at_in_seconds)
|
|
|
|
return process_instance
|
|
|
|
@classmethod
|
|
def create_user_with_permission(
|
|
cls,
|
|
username: str,
|
|
target_uri: str = PermissionTargetModel.URI_ALL,
|
|
permission_names: list[str] | None = None,
|
|
) -> UserModel:
|
|
user = BaseTest.find_or_create_user(username=username)
|
|
return cls.add_permissions_to_user(user, target_uri=target_uri, permission_names=permission_names)
|
|
|
|
@classmethod
|
|
def add_permissions_to_user(
|
|
cls,
|
|
user: UserModel,
|
|
target_uri: str = PermissionTargetModel.URI_ALL,
|
|
permission_names: list[str] | None = None,
|
|
) -> UserModel:
|
|
permission_target = AuthorizationService.find_or_create_permission_target(target_uri)
|
|
|
|
if permission_names is None:
|
|
permission_names = [member.name for member in Permission]
|
|
|
|
for permission in permission_names:
|
|
AuthorizationService.create_permission_for_principal(
|
|
principal=user.principal,
|
|
permission_target=permission_target,
|
|
permission=permission,
|
|
)
|
|
return user
|
|
|
|
def assert_user_has_permission(
|
|
self,
|
|
user: UserModel,
|
|
permission: str,
|
|
target_uri: str,
|
|
expected_result: bool = True,
|
|
) -> None:
|
|
has_permission = AuthorizationService.user_has_permission(
|
|
user=user,
|
|
permission=permission,
|
|
target_uri=target_uri,
|
|
)
|
|
assert has_permission is expected_result
|
|
|
|
def modify_process_identifier_for_path_param(self, identifier: str) -> str:
|
|
return ProcessModelInfo.modify_process_identifier_for_path_param(identifier)
|
|
|
|
def un_modify_modified_process_identifier_for_path_param(self, modified_identifier: str) -> str:
|
|
return modified_identifier.replace(":", "/")
|
|
|
|
def create_process_model_with_metadata(self) -> ProcessModelInfo:
|
|
self.create_process_group("test_group", "test_group")
|
|
process_model = load_test_spec(
|
|
"test_group/hello_world",
|
|
process_model_source_directory="nested-task-data-structure",
|
|
)
|
|
ProcessModelService.update_process_model(
|
|
process_model,
|
|
{
|
|
"metadata_extraction_paths": [
|
|
{"key": "awesome_var", "path": "outer.inner"},
|
|
{"key": "invoice_number", "path": "invoice_number"},
|
|
]
|
|
},
|
|
)
|
|
return process_model
|
|
|
|
def post_to_process_instance_list(
|
|
self,
|
|
client: FlaskClient,
|
|
user: UserModel,
|
|
report_metadata: ReportMetadata | None = None,
|
|
param_string: str | None = "",
|
|
) -> TestResponse:
|
|
report_metadata_to_use = report_metadata
|
|
if report_metadata_to_use is None:
|
|
report_metadata_to_use = self.empty_report_metadata_body()
|
|
response = client.post(
|
|
f"/v1.0/process-instances{param_string}",
|
|
headers=self.logged_in_headers(user),
|
|
content_type="application/json",
|
|
data=json.dumps({"report_metadata": report_metadata_to_use}),
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json is not None
|
|
return response
|
|
|
|
def empty_report_metadata_body(self) -> ReportMetadata:
|
|
return {"filter_by": [], "columns": [], "order_by": []}
|
|
|
|
def start_sender_process(
|
|
self,
|
|
client: FlaskClient,
|
|
payload: dict,
|
|
group_name: str = "test_group",
|
|
) -> ProcessInstanceModel:
|
|
process_model = load_test_spec(
|
|
"test_group/message",
|
|
process_model_source_directory="message_send_one_conversation",
|
|
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
|
|
)
|
|
|
|
process_instance = self.create_process_instance_from_process_model(process_model)
|
|
processor_send_receive = ProcessInstanceProcessor(process_instance)
|
|
processor_send_receive.do_engine_steps(save=True)
|
|
task = processor_send_receive.get_all_user_tasks()[0]
|
|
human_task = process_instance.active_human_tasks[0]
|
|
|
|
ProcessInstanceService.complete_form_task(
|
|
processor_send_receive,
|
|
task,
|
|
payload,
|
|
process_instance.process_initiator,
|
|
human_task,
|
|
)
|
|
processor_send_receive.save()
|
|
return process_instance
|
|
|
|
def assure_a_message_was_sent(self, process_instance: ProcessInstanceModel, payload: dict) -> None:
|
|
# There should be one new send message for the given process instance.
|
|
send_messages = (
|
|
MessageInstanceModel.query.filter_by(message_type="send")
|
|
.filter_by(process_instance_id=process_instance.id)
|
|
.order_by(MessageInstanceModel.id)
|
|
.all()
|
|
)
|
|
assert len(send_messages) == 1
|
|
send_message = send_messages[0]
|
|
assert send_message.payload == payload, "The send message should match up with the payload"
|
|
assert send_message.name == "Request Approval"
|
|
assert send_message.status == "ready"
|
|
|
|
def assure_there_is_a_process_waiting_on_a_message(self, process_instance: ProcessInstanceModel) -> None:
|
|
# There should be one new send message for the given process instance.
|
|
waiting_messages = (
|
|
MessageInstanceModel.query.filter_by(message_type="receive")
|
|
.filter_by(status="ready")
|
|
.filter_by(process_instance_id=process_instance.id)
|
|
.order_by(MessageInstanceModel.id)
|
|
.all()
|
|
)
|
|
assert len(waiting_messages) == 1
|
|
waiting_message = waiting_messages[0]
|
|
self.assure_correlation_properties_are_right(waiting_message)
|
|
|
|
def assure_correlation_properties_are_right(self, message: MessageInstanceModel) -> None:
|
|
# Correlation Properties should match up
|
|
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
|
|
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
|
|
assert po_curr is not None
|
|
assert customer_curr is not None
|
|
|
|
def create_process_instance_with_synthetic_metadata(
|
|
self, process_model: ProcessModelInfo, process_instance_metadata_dict: dict
|
|
) -> ProcessInstanceModel:
|
|
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
|
|
processor = ProcessInstanceProcessor(process_instance)
|
|
processor.do_engine_steps(save=True)
|
|
for key, value in process_instance_metadata_dict.items():
|
|
process_instance_metadata = ProcessInstanceMetadataModel(
|
|
process_instance_id=process_instance.id,
|
|
key=key,
|
|
value=value,
|
|
)
|
|
db.session.add(process_instance_metadata)
|
|
db.session.commit()
|
|
return process_instance
|
|
|
|
def assert_report_with_process_metadata_operator_includes_instance(
|
|
self,
|
|
client: FlaskClient,
|
|
user: UserModel,
|
|
process_instance: ProcessInstanceModel,
|
|
operator: str,
|
|
filter_field_value: str = "",
|
|
) -> None:
|
|
report_metadata: ReportMetadata = {
|
|
"columns": [
|
|
{"Header": "ID", "accessor": "id", "filterable": False},
|
|
{"Header": "Key one", "accessor": "key1", "filterable": False},
|
|
{"Header": "Key two", "accessor": "key2", "filterable": False},
|
|
],
|
|
"order_by": ["status"],
|
|
"filter_by": [{"field_name": "key1", "field_value": filter_field_value, "operator": operator}],
|
|
}
|
|
process_instance_report = ProcessInstanceReportModel.create_report(
|
|
identifier=f"{process_instance.id}_sure",
|
|
report_metadata=report_metadata,
|
|
user=user,
|
|
)
|
|
response = self.post_to_process_instance_list(
|
|
client, user, report_metadata=process_instance_report.get_report_metadata()
|
|
)
|
|
assert len(response.json["results"]) == 1
|
|
assert response.json["results"][0]["id"] == process_instance.id
|
|
db.session.delete(process_instance_report)
|
|
db.session.commit()
|
|
|
|
def complete_next_manual_task(self, processor: ProcessInstanceProcessor) -> None:
|
|
user_task = processor.get_ready_user_tasks()[0]
|
|
human_task = processor.process_instance_model.human_tasks[0]
|
|
ProcessInstanceService.complete_form_task(
|
|
processor, user_task, {}, processor.process_instance_model.process_initiator, human_task
|
|
)
|
|
|
|
@contextmanager
|
|
def app_config_mock(self, app: Flask, config_identifier: str, new_config_value: Any) -> Generator:
|
|
initial_value = app.config[config_identifier]
|
|
app.config[config_identifier] = new_config_value
|
|
try:
|
|
yield
|
|
finally:
|
|
app.config[config_identifier] = initial_value
|