jasquat 18600189c8 Feature/background proc with celery (#788)
* WIP: some initial test code to test out celery w/ burnettk

* some cleanup for celery and added base model to put tasks waiting on timers

* removed dup bpmn file

* some more cleanup and added strategy to queue instructions

* some minor code changes w/ burnettk

* remove the unused next_task key from api calls since nobody uses it w/ burnettk essweine

* added migration for future tasks and added test to make sure we are inserting into it w/ burnettk essweine

* ensure future task run at time can be updated w/ burnettk

* added table to queue instructions for end user in w/ burnettk

* added test to ensure we are storing instructions for end users w/ burnettk

* added progress page to display new instructions to user

* ignore dup instructions on db insert w/ burnettk

* some more updates for celery w/ burnettk

* some pyl and test fixes w/ burnettk

* fixed tests w/ burnettk

* WIP: added in page to show instructions on pi show page w/ burnettk

* pi show page is fully using not interstitial now w/ burnettk

* fixed broken test w/ burnettk

* moved background processing items to own module w/ burnettk

* fixed apscheduler start script

* updated celery task queue to handle future tasks and upgraded black and set its line-length to match ruff w/ burnettk

* added support to run future tasks using countdown w/ burnettk

* build image for celery branch w/ burnettk

* poet does not exist in the image w/ burnettk

* start blocking scheduler should always start the scheduler w/ burnettk

* add init and stuff for this branch

* make this work not just on my mac

* send other args to only

* added running status for process instance and use that on fe to go to show page and added additional identifier to locking system to isolate celery workers better w/ burnettk

* fixed typing error that typeguard found, not sure why mypy did not w/ burnettk

* do not check for no instructions on interstitial page for cypress tests on frontend w/ burnettk

* do not queue process instances twice w/ burnettk

* removed bad file w/ burnettk

* queue tasks using strings to avoid circular imports when attmepting to queue w/ burnettk

* only queue imminent new timer events and mock celery

* some keyboard shortcut support on frontend and added ability to force run a process instance over the api w/ burnettk

* some styles added for the shortcut menu w/ burnettk

* pyl w/ burnettk

* fixed test w/ burnettk

* removed temporary celery script and added support for celery worker in run server locally w/ burnettk

* cleaned up migrations w/ burnettk

* created new migration to clean up old migrations

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
Co-authored-by: burnettk <burnettk@users.noreply.github.com>
2023-12-05 11:41:59 -05:00

550 lines
22 KiB
Python

import io
import json
import os
import time
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any
from flask import current_app
from flask.app import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.permission_assignment import Permission
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.process_group import ProcessGroup
from spiffworkflow_backend.models.process_group import ProcessGroupSchema
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel
from spiffworkflow_backend.models.process_instance_report import FilterValue
from spiffworkflow_backend.models.process_instance_report import ProcessInstanceReportModel
from spiffworkflow_backend.models.process_instance_report import ReportMetadata
from spiffworkflow_backend.models.process_model import NotificationType
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService
from werkzeug.test import TestResponse # type: ignore
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
# from tests.spiffworkflow_backend.helpers.test_data import logged_in_headers
class BaseTest:
@staticmethod
def find_or_create_user(username: str = "test_user_1") -> UserModel:
user = UserModel.query.filter_by(username=username).first()
if isinstance(user, UserModel):
return user
user = UserService.create_user(username, "internal", username)
if isinstance(user, UserModel):
return user
raise ApiError(
error_code="create_user_error",
message=f"Cannot find or create user: {username}",
)
@staticmethod
def logged_in_headers(
user: UserModel, _redirect_url: str = "http://some/frontend/url", extra_token_payload: dict | None = None
) -> dict[str, str]:
return {"Authorization": "Bearer " + user.encode_auth_token(extra_token_payload)}
def create_group_and_model_with_bpmn(
self,
client: FlaskClient,
user: UserModel,
process_group_id: str | None = "test_group",
process_model_id: str | None = "random_fact",
bpmn_file_name: str | None = None,
bpmn_file_location: str | None = None,
) -> ProcessModelInfo:
"""Creates a process group.
Creates a process model
Adds a bpmn file to the model.
"""
process_group_display_name = process_group_id or ""
process_group_description = process_group_id or ""
process_model_identifier = f"{process_group_id}/{process_model_id}"
if bpmn_file_location is None:
bpmn_file_location = process_model_id
self.create_process_group_with_api(client, user, process_group_description, process_group_display_name)
self.create_process_model_with_api(
client,
process_model_id=process_model_identifier,
process_model_display_name=process_group_display_name,
process_model_description=process_group_description,
user=user,
)
process_model = load_test_spec(
process_model_id=process_model_identifier,
bpmn_file_name=bpmn_file_name,
process_model_source_directory=bpmn_file_location,
)
return process_model
def create_process_group(
self,
process_group_id: str,
display_name: str = "",
) -> ProcessGroup:
process_group = ProcessGroup(id=process_group_id, display_name=display_name, display_order=0, admin=False)
return ProcessModelService.add_process_group(process_group)
def create_process_group_with_api(
self,
client: FlaskClient,
user: Any,
process_group_id: str,
display_name: str = "",
) -> str:
process_group = ProcessGroup(id=process_group_id, display_name=display_name, display_order=0, admin=False)
response = client.post(
"/v1.0/process-groups",
headers=self.logged_in_headers(user),
content_type="application/json",
data=json.dumps(ProcessGroupSchema().dump(process_group)),
)
assert response.status_code == 201
assert response.json is not None
assert response.json["id"] == process_group_id
return process_group_id
def create_process_model_with_api(
self,
client: FlaskClient,
process_model_id: str | None = None,
process_model_display_name: str = "Cooooookies",
process_model_description: str = "Om nom nom delicious cookies",
fault_or_suspend_on_exception: str = NotificationType.suspend.value,
exception_notification_addresses: list | None = None,
primary_process_id: str | None = None,
primary_file_name: str | None = None,
user: UserModel | None = None,
) -> TestResponse:
if process_model_id is not None:
# make sure we have a group
process_group_id, _ = os.path.split(process_model_id)
modified_process_group_id = process_group_id.replace("/", ":")
process_group_path = os.path.abspath(os.path.join(FileSystemService.root_path(), process_group_id))
if ProcessModelService.is_process_group(process_group_path):
if exception_notification_addresses is None:
exception_notification_addresses = []
model = ProcessModelInfo(
id=process_model_id,
display_name=process_model_display_name,
description=process_model_description,
primary_process_id=primary_process_id,
primary_file_name=primary_file_name,
fault_or_suspend_on_exception=fault_or_suspend_on_exception,
exception_notification_addresses=exception_notification_addresses,
)
if user is None:
user = self.find_or_create_user()
response = client.post(
f"/v1.0/process-models/{modified_process_group_id}",
content_type="application/json",
data=json.dumps(ProcessModelInfoSchema().dump(model)),
headers=self.logged_in_headers(user),
)
assert response.status_code == 201
return response
else:
raise Exception("You must create the group first")
else:
raise Exception("You must include the process_model_id, which must be a path to the model")
def get_test_data_file_full_path(self, file_name: str, process_model_test_data_dir: str) -> str:
return os.path.join(
current_app.instance_path,
"..",
"..",
"tests",
"data",
process_model_test_data_dir,
file_name,
)
def get_test_data_file_contents(self, file_name: str, process_model_test_data_dir: str) -> bytes:
file_full_path = self.get_test_data_file_full_path(file_name, process_model_test_data_dir)
with open(file_full_path, "rb") as file:
return file.read()
def create_spec_file(
self,
client: FlaskClient,
process_model_id: str,
process_model_location: str | None = None,
process_model: ProcessModelInfo | None = None,
file_name: str = "random_fact.bpmn",
file_data: bytes = b"abcdef",
user: UserModel | None = None,
) -> Any:
"""Test_create_spec_file.
Adds a bpmn file to the model.
process_model_id is the destination path
process_model_location is the source path
because of permissions, user might be required now..., not sure yet.
"""
if process_model_location is None:
process_model_location = file_name.split(".")[0]
if process_model is None:
process_model = load_test_spec(
process_model_id=process_model_id,
bpmn_file_name=file_name,
process_model_source_directory=process_model_location,
)
data = {"file": (io.BytesIO(file_data), file_name)}
if user is None:
user = self.find_or_create_user()
modified_process_model_id = process_model.id.replace("/", ":")
response = client.post(
f"/v1.0/process-models/{modified_process_model_id}/files",
data=data,
follow_redirects=True,
content_type="multipart/form-data",
headers=self.logged_in_headers(user),
)
assert response.status_code == 201
assert response.get_data() is not None
file = json.loads(response.get_data(as_text=True))
# assert FileType.svg.value == file["type"]
# assert "image/svg+xml" == file["content_type"]
response = client.get(
f"/v1.0/process-models/{modified_process_model_id}/files/{file_name}",
headers=self.logged_in_headers(user),
)
assert response.status_code == 200
file2 = json.loads(response.get_data(as_text=True))
assert file["file_contents"] == file2["file_contents"]
return file
@staticmethod
def create_process_instance_from_process_model_id_with_api(
client: FlaskClient,
test_process_model_id: str,
headers: dict[str, str],
) -> TestResponse:
"""Create_process_instance.
There must be an existing process model to instantiate.
"""
if not ProcessModelService.is_process_model_identifier(test_process_model_id):
dirname = os.path.dirname(test_process_model_id)
if not ProcessModelService.is_process_group_identifier(dirname):
process_group = ProcessGroup(id=dirname, display_name=dirname)
ProcessModelService.add_process_group(process_group)
basename = os.path.basename(test_process_model_id)
load_test_spec(
process_model_id=test_process_model_id,
process_model_source_directory=basename,
bpmn_file_name=basename,
)
modified_process_model_id = test_process_model_id.replace("/", ":")
response = client.post(
f"/v1.0/process-instances/{modified_process_model_id}",
headers=headers,
)
assert response.status_code == 201
return response
# @staticmethod
# def get_public_access_token(username: str, password: str) -> dict:
# """Get_public_access_token."""
# public_access_token = AuthenticationService().get_public_access_token(
# username, password
# )
# return public_access_token
def create_process_instance_from_process_model(
self,
process_model: ProcessModelInfo,
status: str | None = "not_started",
user: UserModel | None = None,
) -> ProcessInstanceModel:
if user is None:
user = self.find_or_create_user()
current_time = round(time.time())
process_instance = ProcessInstanceModel(
status=status,
process_initiator=user,
process_model_identifier=process_model.id,
process_model_display_name=process_model.display_name,
updated_at_in_seconds=round(time.time()),
start_in_seconds=current_time - (3600 * 1),
end_in_seconds=current_time - (3600 * 1 - 20),
)
db.session.add(process_instance)
db.session.commit()
run_at_in_seconds = round(time.time())
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance, run_at_in_seconds)
return process_instance
@classmethod
def create_user_with_permission(
cls,
username: str,
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: list[str] | None = None,
grant_type: str = "permit",
) -> UserModel:
user = BaseTest.find_or_create_user(username=username)
return cls.add_permissions_to_user(user, target_uri=target_uri, permission_names=permission_names, grant_type=grant_type)
@classmethod
def add_permissions_to_user(
cls,
user: UserModel,
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: list[str] | None = None,
grant_type: str = "permit",
) -> UserModel:
principal = user.principal
cls.add_permissions_to_principal(
principal, target_uri=target_uri, permission_names=permission_names, grant_type=grant_type
)
return user
@classmethod
def add_permissions_to_principal(
cls, principal: PrincipalModel, target_uri: str, permission_names: list[str] | None, grant_type: str = "permit"
) -> None:
permission_target = AuthorizationService.find_or_create_permission_target(target_uri)
if permission_names is None:
permission_names = [member.name for member in Permission]
for permission in permission_names:
AuthorizationService.create_permission_for_principal(
principal=principal,
permission_target=permission_target,
permission=permission,
grant_type=grant_type,
)
def assert_user_has_permission(
self,
user: UserModel,
permission: str,
target_uri: str,
expected_result: bool = True,
) -> None:
has_permission = AuthorizationService.user_has_permission(
user=user,
permission=permission,
target_uri=target_uri,
)
assert has_permission is expected_result
def modify_process_identifier_for_path_param(self, identifier: str) -> str:
return ProcessModelInfo.modify_process_identifier_for_path_param(identifier)
def un_modify_modified_process_identifier_for_path_param(self, modified_identifier: str) -> str:
return modified_identifier.replace(":", "/")
def create_process_model_with_metadata(self) -> ProcessModelInfo:
self.create_process_group("test_group", "test_group")
process_model = load_test_spec(
"test_group/hello_world",
process_model_source_directory="nested-task-data-structure",
)
ProcessModelService.update_process_model(
process_model,
{
"metadata_extraction_paths": [
{"key": "awesome_var", "path": "outer.inner"},
{"key": "invoice_number", "path": "invoice_number"},
]
},
)
return process_model
def post_to_process_instance_list(
self,
client: FlaskClient,
user: UserModel,
report_metadata: ReportMetadata | None = None,
param_string: str | None = "",
) -> TestResponse:
report_metadata_to_use = report_metadata
if report_metadata_to_use is None:
report_metadata_to_use = self.empty_report_metadata_body()
response = client.post(
f"/v1.0/process-instances{param_string}",
headers=self.logged_in_headers(user),
content_type="application/json",
data=json.dumps({"report_metadata": report_metadata_to_use}),
)
assert response.status_code == 200
assert response.json is not None
return response
def empty_report_metadata_body(self) -> ReportMetadata:
return {"filter_by": [], "columns": [], "order_by": []}
def start_sender_process(
self,
client: FlaskClient,
payload: dict,
group_name: str = "test_group",
) -> ProcessInstanceModel:
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
process_instance = self.create_process_instance_from_process_model(process_model)
processor_send_receive = ProcessInstanceProcessor(process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
payload,
process_instance.process_initiator,
human_task,
)
processor_send_receive.save()
return process_instance
def assure_a_message_was_sent(self, process_instance: ProcessInstanceModel, payload: dict) -> None:
# There should be one new send message for the given process instance.
send_messages = (
MessageInstanceModel.query.filter_by(message_type="send")
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(send_messages) == 1
send_message = send_messages[0]
assert send_message.payload == payload, "The send message should match up with the payload"
assert send_message.name == "Request Approval"
assert send_message.status == "ready"
def assure_there_is_a_process_waiting_on_a_message(self, process_instance: ProcessInstanceModel) -> None:
# There should be one new send message for the given process instance.
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
def assure_correlation_properties_are_right(self, message: MessageInstanceModel) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None
def create_process_instance_with_synthetic_metadata(
self, process_model: ProcessModelInfo, process_instance_metadata_dict: dict
) -> ProcessInstanceModel:
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
for key, value in process_instance_metadata_dict.items():
process_instance_metadata = ProcessInstanceMetadataModel(
process_instance_id=process_instance.id,
key=key,
value=value,
)
db.session.add(process_instance_metadata)
db.session.commit()
return process_instance
def assert_report_with_process_metadata_operator_includes_instance(
self,
client: FlaskClient,
user: UserModel,
process_instance: ProcessInstanceModel,
operator: str,
filter_field_value: str = "",
filters: list[FilterValue] | None = None,
expect_to_find_instance: bool = True,
) -> None:
if filters is None:
filters = []
first_filter: FilterValue = {"field_name": "key1", "field_value": filter_field_value, "operator": operator}
filters.append(first_filter)
report_metadata: ReportMetadata = {
"columns": [
{"Header": "ID", "accessor": "id", "filterable": False},
{"Header": "Key one", "accessor": "key1", "filterable": False},
{"Header": "Key two", "accessor": "key2", "filterable": False},
],
"order_by": ["status"],
"filter_by": filters,
}
process_instance_report = ProcessInstanceReportModel.create_report(
identifier=f"{process_instance.id}_sure",
report_metadata=report_metadata,
user=user,
)
response = self.post_to_process_instance_list(client, user, report_metadata=process_instance_report.get_report_metadata())
if expect_to_find_instance is True:
assert len(response.json["results"]) == 1
assert response.json["results"][0]["id"] == process_instance.id
else:
if len(response.json["results"]) == 1:
first_result = response.json["results"][0]
assert (
first_result["id"] != process_instance.id
), f"expected not to find a specific process instance, but we found it: {first_result}"
else:
assert len(response.json["results"]) == 0
db.session.delete(process_instance_report)
db.session.commit()
def complete_next_manual_task(self, processor: ProcessInstanceProcessor) -> None:
user_task = processor.get_ready_user_tasks()[0]
human_task = processor.process_instance_model.human_tasks[0]
ProcessInstanceService.complete_form_task(
processor, user_task, {}, processor.process_instance_model.process_initiator, human_task
)
@contextmanager
def app_config_mock(self, app: Flask, config_identifier: str, new_config_value: Any) -> Generator:
initial_value = app.config[config_identifier]
app.config[config_identifier] = new_config_value
try:
yield
finally:
app.config[config_identifier] = initial_value