added an init method to task service and move a lot of code from workflow execution to it and fixed up the task running test to check things more thoroughly

This commit is contained in:
jasquat 2023-03-30 11:15:27 -04:00
parent edc0ea83fa
commit e3b8653296
5 changed files with 268 additions and 141 deletions

View File

@ -1,6 +1,6 @@
"""Process_instance_processor."""
import copy
import _strptime # type: ignore
import copy
import decimal
import json
import logging
@ -1373,7 +1373,7 @@ class ProcessInstanceProcessor:
bpmn_process = to_task_model.bpmn_process
properties_json = copy.copy(bpmn_process.properties_json)
properties_json['last_task'] = parent_task_model.guid
properties_json["last_task"] = parent_task_model.guid
bpmn_process.properties_json = properties_json
db.session.add(bpmn_process)
db.session.commit()
@ -1818,6 +1818,13 @@ class ProcessInstanceProcessor:
user_id=user.id,
)
task_service = TaskService(
process_instance=self.process_instance_model,
serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
task_service.process_parents_and_children_and_save_to_database(spiff_task)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save()

View File

@ -1,5 +1,6 @@
import copy
import json
import time
from hashlib import sha256
from typing import Optional
from typing import Tuple
@ -20,6 +21,8 @@ from spiffworkflow_backend.models.bpmn_process import BpmnProcessNotFoundError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
@ -31,6 +34,135 @@ class JsonDataDict(TypedDict):
class TaskService:
PYTHON_ENVIRONMENT_STATE_KEY = "spiff__python_env_state"
def __init__(
self,
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
bpmn_definition_to_task_definitions_mappings: dict,
) -> None:
self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
self.serializer = serializer
self.bpmn_processes: dict[str, BpmnProcessModel] = {}
self.task_models: dict[str, TaskModel] = {}
self.json_data_dicts: dict[str, JsonDataDict] = {}
self.process_instance_events: dict[str, ProcessInstanceEventModel] = {}
def save_objects_to_database(self) -> None:
db.session.bulk_save_objects(self.bpmn_processes.values())
db.session.bulk_save_objects(self.task_models.values())
db.session.bulk_save_objects(self.process_instance_events.values())
self.__class__.insert_or_update_json_data_records(self.json_data_dicts)
def process_parents_and_children_and_save_to_database(
self,
spiff_task: SpiffTask,
) -> None:
self.process_spiff_task_children(spiff_task)
self.process_spiff_task_parents(spiff_task)
self.save_objects_to_database()
def process_spiff_task_children(
self,
spiff_task: SpiffTask,
) -> None:
for child_spiff_task in spiff_task.children:
self.update_task_model_with_spiff_task(
spiff_task=child_spiff_task,
)
self.process_spiff_task_children(
spiff_task=child_spiff_task,
)
def process_spiff_task_parents(
self,
spiff_task: SpiffTask,
) -> None:
(parent_subprocess_guid, _parent_subprocess) = self.__class__.task_subprocess(spiff_task)
if parent_subprocess_guid is not None:
spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id(
UUID(parent_subprocess_guid)
)
if spiff_task_of_parent_subprocess is not None:
self.update_task_model_with_spiff_task(
spiff_task=spiff_task_of_parent_subprocess,
)
self.process_spiff_task_parents(
spiff_task=spiff_task_of_parent_subprocess,
)
def update_task_model_with_spiff_task(
self,
spiff_task: SpiffTask,
task_failed: bool = False,
) -> TaskModel:
(
new_bpmn_process,
task_model,
new_task_models,
new_json_data_dicts,
) = self.__class__.find_or_create_task_model_from_spiff_task(
spiff_task,
self.process_instance,
self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
bpmn_process = new_bpmn_process or task_model.bpmn_process
bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(
bpmn_process, spiff_task.workflow.data
)
self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
json_data_dict_list = self.__class__.update_task_model(task_model, spiff_task, self.serializer)
self.task_models[task_model.guid] = task_model
if bpmn_process_json_data is not None:
json_data_dict_list.append(bpmn_process_json_data)
self._update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts)
if task_model.state == "COMPLETED" or task_failed:
event_type = ProcessInstanceEventType.task_completed.value
if task_failed:
event_type = ProcessInstanceEventType.task_failed.value
# FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete
# which script tasks execute when READY.
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
process_instance_event = ProcessInstanceEventModel(
task_guid=task_model.guid,
process_instance_id=self.process_instance.id,
event_type=event_type,
timestamp=timestamp,
)
self.process_instance_events[task_model.guid] = process_instance_event
# self.update_bpmn_process(spiff_task.workflow, bpmn_process)
return task_model
def update_bpmn_process(
self,
spiff_workflow: BpmnWorkflow,
bpmn_process: BpmnProcessModel,
) -> None:
# import pdb; pdb.set_trace()
new_properties_json = copy.copy(bpmn_process.properties_json)
new_properties_json["last_task"] = str(spiff_workflow.last_task) if spiff_workflow.last_task else None
new_properties_json["success"] = spiff_workflow.success
bpmn_process.properties_json = new_properties_json
bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(bpmn_process, spiff_workflow.data)
if bpmn_process_json_data is not None:
self.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data
self.bpmn_processes[bpmn_process.guid or "top_level"] = bpmn_process
if spiff_workflow.outer_workflow != spiff_workflow:
direct_parent_bpmn_process = BpmnProcessModel.query.filter_by(
id=bpmn_process.direct_parent_process_id
).first()
self.update_bpmn_process(spiff_workflow.outer_workflow, direct_parent_bpmn_process)
@classmethod
def insert_or_update_json_data_records(
cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict]
@ -395,3 +527,11 @@ class TaskService:
# this helps to convert items like datetime objects to be json serializable
converted_data: dict = serializer.data_converter.convert(user_defined_state)
return converted_data
@classmethod
def _update_json_data_dicts_using_list(
cls, json_data_dict_list: list[Optional[JsonDataDict]], json_data_dicts: dict[str, JsonDataDict]
) -> None:
for json_data_dict in json_data_dict_list:
if json_data_dict is not None:
json_data_dicts[json_data_dict["hash"]] = json_data_dict

View File

@ -1,7 +1,6 @@
import time
from typing import Callable
from typing import Optional
from uuid import UUID
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
@ -10,23 +9,18 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models import task_definition
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task import TaskModel
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService,
)
from spiffworkflow_backend.services.task_service import JsonDataDict
from spiffworkflow_backend.services.task_service import TaskService
@ -67,11 +61,17 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.current_task_model: Optional[TaskModel] = None
self.current_task_start_in_seconds: Optional[float] = None
self.task_models: dict[str, TaskModel] = {}
self.json_data_dicts: dict[str, JsonDataDict] = {}
self.process_instance_events: dict[str, ProcessInstanceEventModel] = {}
# self.task_models: dict[str, TaskModel] = {}
# self.json_data_dicts: dict[str, JsonDataDict] = {}
# self.process_instance_events: dict[str, ProcessInstanceEventModel] = {}
self.last_completed_spiff_task: Optional[SpiffTask] = None
self.task_service = TaskService(
process_instance=self.process_instance,
serializer=self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model():
self.current_task_start_in_seconds = time.time()
@ -80,7 +80,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model():
task_model = self._update_task_model_with_spiff_task(spiff_task)
task_model = self.task_service.update_task_model_with_spiff_task(spiff_task)
if self.current_task_start_in_seconds is None:
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend")
task_model.start_in_seconds = self.current_task_start_in_seconds
@ -93,13 +93,9 @@ class TaskModelSavingDelegate(EngineStepDelegate):
script_engine = bpmn_process_instance.script_engine
if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None:
failing_spiff_task = script_engine.failing_spiff_task
self._update_task_model_with_spiff_task(failing_spiff_task, task_failed=True)
self.task_service.update_task_model_with_spiff_task(failing_spiff_task, task_failed=True)
# import pdb; pdb.set_trace()
db.session.bulk_save_objects(self.task_models.values())
db.session.bulk_save_objects(self.process_instance_events.values())
TaskService.insert_or_update_json_data_records(self.json_data_dicts)
self.task_service.save_objects_to_database()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False)
@ -115,24 +111,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# ):
# self._update_task_model_with_spiff_task(waiting_spiff_task)
if self.last_completed_spiff_task is not None:
self._process_spiff_task_children(self.last_completed_spiff_task)
self._process_spiff_task_parents(self.last_completed_spiff_task)
def _process_spiff_task_children(self, spiff_task: SpiffTask) -> None:
for child_spiff_task in spiff_task.children:
self._update_task_model_with_spiff_task(child_spiff_task)
self._process_spiff_task_children(child_spiff_task)
def _process_spiff_task_parents(self, spiff_task: SpiffTask) -> None:
(parent_subprocess_guid, _parent_subprocess) = TaskService.task_subprocess(spiff_task)
if parent_subprocess_guid is not None:
spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id(
UUID(parent_subprocess_guid)
)
if spiff_task_of_parent_subprocess is not None:
self._update_task_model_with_spiff_task(spiff_task_of_parent_subprocess)
self._process_spiff_task_parents(spiff_task_of_parent_subprocess)
self.task_service.process_spiff_task_parents(self.last_completed_spiff_task)
self.task_service.process_spiff_task_children(self.last_completed_spiff_task)
def _should_update_task_model(self) -> bool:
"""We need to figure out if we have previously save task info on this process intance.
@ -142,63 +122,6 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# return self.process_instance.bpmn_process_id is not None
return True
def _update_json_data_dicts_using_list(self, json_data_dict_list: list[Optional[JsonDataDict]]) -> None:
for json_data_dict in json_data_dict_list:
if json_data_dict is not None:
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask, task_failed: bool = False) -> TaskModel:
(
bpmn_process,
task_model,
new_task_models,
new_json_data_dicts,
) = TaskService.find_or_create_task_model_from_spiff_task(
spiff_task,
self.process_instance,
self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process(
bpmn_process or task_model.bpmn_process, spiff_task.workflow.data
)
# stp = False
# for ntm in new_task_models.values():
# td = TaskDefinitionModel.query.filter_by(id=ntm.task_definition_id).first()
# if td.bpmn_identifier == 'Start':
# # import pdb; pdb.set_trace()
# stp = True
# print("HEY")
# if stp:
# # import pdb; pdb.set_trace()
# print("HEY2")
self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer)
self.task_models[task_model.guid] = task_model
if bpmn_process_json_data is not None:
json_data_dict_list.append(bpmn_process_json_data)
self._update_json_data_dicts_using_list(json_data_dict_list)
if task_model.state == "COMPLETED" or task_failed:
event_type = ProcessInstanceEventType.task_completed.value
if task_failed:
event_type = ProcessInstanceEventType.task_failed.value
# FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete
# which script tasks execute when READY.
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
process_instance_event = ProcessInstanceEventModel(
task_guid=task_model.guid,
process_instance_id=self.process_instance.id,
event_type=event_type,
timestamp=timestamp,
)
self.process_instance_events[task_model.guid] = process_instance_event
return task_model
class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""

View File

@ -4,7 +4,7 @@
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0stlaxe</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:endEvent id="end_event_of_manual_task_model">
<bpmn:endEvent id="end_event_of_manual_task_model" name="End Event Of Manual Task Model">
<bpmn:incoming>Flow_1ygcsbt</bpmn:incoming>
</bpmn:endEvent>
<bpmn:manualTask id="top_level_manual_task_two" name="Top Level Manual Task Two">
@ -23,7 +23,7 @@
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_1fktmf7" sourceRef="top_level_script" targetRef="top_level_manual_task_one" />
<bpmn:sequenceFlow id="Flow_09gjylo" sourceRef="top_level_manual_task_two" targetRef="top_level_subprocess" />
<bpmn:subProcess id="top_level_subprocess">
<bpmn:subProcess id="top_level_subprocess" name="Top Level Subprocess">
<bpmn:incoming>Flow_09gjylo</bpmn:incoming>
<bpmn:outgoing>Flow_0yxus36</bpmn:outgoing>
<bpmn:startEvent id="Event_0g7txdo">
@ -46,7 +46,7 @@ except:
we_move_on = False</bpmn:script>
</bpmn:scriptTask>
</bpmn:subProcess>
<bpmn:callActivity id="top_level_call_activity" calledElement="test_process_to_call">
<bpmn:callActivity id="top_level_call_activity" name="Top Level Call Activity" calledElement="test_process_to_call">
<bpmn:incoming>Flow_0yxus36</bpmn:incoming>
<bpmn:outgoing>Flow_187mcqe</bpmn:outgoing>
</bpmn:callActivity>
@ -60,7 +60,7 @@ except:
<bpmn:conditionExpression>we_move_on == True</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="Flow_1ygcsbt" sourceRef="top_level_process_script_after_gate" targetRef="end_event_of_manual_task_model" />
<bpmn:scriptTask id="top_level_process_script_after_gate">
<bpmn:scriptTask id="top_level_process_script_after_gate" name="Top Level Process Script After Gate">
<bpmn:incoming>Flow_0lw7sda</bpmn:incoming>
<bpmn:outgoing>Flow_1ygcsbt</bpmn:outgoing>
<bpmn:script>set_top_level_process_script_after_gate = 1</bpmn:script>
@ -78,30 +78,36 @@ except:
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1vokg57_di" bpmnElement="top_level_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0ia26nb_di" bpmnElement="end_event_of_manual_task_model">
<dc:Bounds x="1212" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1yhtryv_di" bpmnElement="top_level_process_script_after_gate">
<dc:Bounds x="1080" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0p8naw0_di" bpmnElement="Gateway_0p8naw0" isMarkerVisible="true">
<dc:Bounds x="1005" y="152" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_04hrmow_di" bpmnElement="top_level_call_activity">
<dc:Bounds x="870" y="137" width="100" height="80" />
<bpmndi:BPMNLabel>
<dc:Bounds x="1200" y="202" width="67" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1rcj16n_di" bpmnElement="top_level_manual_task_two">
<dc:Bounds x="610" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0ctgju0_di" bpmnElement="top_level_manual_task_one">
<dc:Bounds x="450" y="137" width="100" height="80" />
<bpmndi:BPMNShape id="Activity_1vokg57_di" bpmnElement="top_level_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_19a46sv_di" bpmnElement="top_level_subprocess">
<dc:Bounds x="740" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_04hrmow_di" bpmnElement="top_level_call_activity">
<dc:Bounds x="870" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0p8naw0_di" bpmnElement="Gateway_0p8naw0" isMarkerVisible="true">
<dc:Bounds x="1005" y="152" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1yhtryv_di" bpmnElement="top_level_process_script_after_gate">
<dc:Bounds x="1080" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0ctgju0_di" bpmnElement="top_level_manual_task_one">
<dc:Bounds x="450" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0stlaxe_di" bpmnElement="Flow_0stlaxe">
<di:waypoint x="215" y="177" />

View File

@ -1,12 +1,12 @@
"""Test_process_instance_processor."""
from uuid import UUID
import json
import pytest
from flask import g
from flask.app import Flask
from flask.testing import FlaskClient
from SpiffWorkflow.task import TaskState # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -16,6 +16,7 @@ from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import (
@ -297,6 +298,7 @@ class TestProcessInstanceProcessor(BaseTest):
spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier(
human_task_one.task_name, processor.bpmn_process_instance
)
assert spiff_manual_task is not None
processor.suspend()
ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True)
@ -336,7 +338,7 @@ class TestProcessInstanceProcessor(BaseTest):
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
with open("before_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2))
# with open("before_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2))
assert len(process_instance.active_human_tasks) == 1
initial_human_task_id = process_instance.active_human_tasks[0].id
assert len(process_instance.active_human_tasks) == 1
@ -346,20 +348,21 @@ class TestProcessInstanceProcessor(BaseTest):
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
processor.suspend()
ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id), commit=True)
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor = ProcessInstanceProcessor(process_instance)
with open("after_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2))
# with open("after_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2))
processor.resume()
processor.do_engine_steps(save=True)
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
import pdb; pdb.set_trace()
import pdb
pdb.set_trace()
assert process_instance.status == "complete"
def test_properly_saves_tasks_when_running(
@ -409,6 +412,9 @@ class TestProcessInstanceProcessor(BaseTest):
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
# recreate variables to ensure all bpmn json was recreated from scratch from the db
process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
@ -429,34 +435,74 @@ class TestProcessInstanceProcessor(BaseTest):
},
}
fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}}
fifth_data_set = {**fourth_data_set, **{"validate_only": False, "set_top_level_process_script_after_gate": 1}}
fifth_data_set = {**fourth_data_set, **{"set_top_level_process_script_after_gate": 1}}
sixth_data_set = {**fifth_data_set, **{"validate_only": False, "set_top_level_process_script_after_gate": 1}}
expected_task_data = {
"top_level_script": first_data_set,
"manual_task": first_data_set,
"top_level_subprocess_script": second_data_set,
"top_level_subprocess": second_data_set,
"test_process_to_call_subprocess_script": third_data_set,
"top_level_call_activity": third_data_set,
"end_event_of_manual_task_model": third_data_set,
"top_level_subprocess_script_second": fourth_data_set,
"test_process_to_call_subprocess_script_second": fourth_data_set,
"top_level_script": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"},
"top_level_manual_task_one": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"},
"top_level_manual_task_two": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"},
"top_level_subprocess_script": {
"data": second_data_set,
"bpmn_process_identifier": "top_level_subprocess",
},
"top_level_subprocess": {"data": second_data_set, "bpmn_process_identifier": "top_level_process"},
"test_process_to_call_subprocess_script": {
"data": third_data_set,
"bpmn_process_identifier": "test_process_to_call_subprocess",
},
"top_level_call_activity": {"data": third_data_set, "bpmn_process_identifier": "top_level_process"},
"top_level_manual_task_two_second": {
"data": third_data_set,
"bpmn_process_identifier": "top_level_process",
},
"top_level_subprocess_script_second": {
"data": fourth_data_set,
"bpmn_process_identifier": "top_level_subprocess",
},
"top_level_subprocess_second": {"data": fourth_data_set, "bpmn_process_identifier": "top_level_process"},
"test_process_to_call_subprocess_script_second": {
"data": fourth_data_set,
"bpmn_process_identifier": "test_process_to_call_subprocess",
},
"top_level_call_activity_second": {
"data": fourth_data_set,
"bpmn_process_identifier": "top_level_process",
},
"end_event_of_manual_task_model": {"data": fifth_data_set, "bpmn_process_identifier": "top_level_process"},
}
spiff_tasks_checked_once: list = []
spiff_tasks_checked: list[str] = []
# TODO: also check task data here from the spiff_task directly to ensure we hydrated spiff correctly
def assert_spiff_task_is_in_process(spiff_task_identifier: str, bpmn_process_identifier: str) -> None:
if spiff_task.task_spec.name == spiff_task_identifier:
expected_task_data_key = spiff_task.task_spec.name
if spiff_task.task_spec.name in spiff_tasks_checked_once:
def assert_spiff_task_is_in_process(spiff_task: SpiffTask) -> None:
spiff_task_identifier = spiff_task.task_spec.name
if spiff_task_identifier in expected_task_data:
bpmn_process_identifier = expected_task_data[spiff_task_identifier]["bpmn_process_identifier"]
expected_task_data_key = spiff_task_identifier
if spiff_task_identifier in spiff_tasks_checked:
expected_task_data_key = f"{spiff_task.task_spec.name}_second"
expected_python_env_data = expected_task_data[expected_task_data_key]
assert expected_task_data_key not in spiff_tasks_checked
spiff_tasks_checked.append(expected_task_data_key)
expected_python_env_data = expected_task_data[expected_task_data_key]["data"]
base_failure_message = (
f"Failed on {bpmn_process_identifier} - {spiff_task_identifier} - task data key"
f" {expected_task_data_key}."
)
count_failure_message = (
f"{base_failure_message} There are more than 2 entries of this task in the db."
" There should only ever be max 2."
)
task_models_with_bpmn_identifier_count = (
TaskModel.query.join(TaskDefinitionModel)
.filter(TaskDefinitionModel.bpmn_identifier == spiff_task.task_spec.name)
.count()
)
assert task_models_with_bpmn_identifier_count < 3, count_failure_message
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
assert task_model.start_in_seconds is not None
@ -466,7 +512,9 @@ class TestProcessInstanceProcessor(BaseTest):
task_definition = task_model.task_definition
assert task_definition.bpmn_identifier == spiff_task_identifier
assert task_definition.bpmn_name == spiff_task_identifier.replace("_", " ").title()
assert task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier
assert (
task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier
), base_failure_message
message = (
f"{base_failure_message} Expected: {sorted(expected_python_env_data)}. Received:"
@ -474,18 +522,14 @@ class TestProcessInstanceProcessor(BaseTest):
)
# TODO: if we split out env data again we will need to use it here instead of json_data
# assert task_model.python_env_data() == expected_python_env_data, message
# import pdb; pdb.set_trace()
assert task_model.json_data() == expected_python_env_data, message
spiff_tasks_checked_once.append(spiff_task.task_spec.name)
all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks()
assert len(all_spiff_tasks) > 1
for spiff_task in all_spiff_tasks:
assert spiff_task.state == TaskState.COMPLETED
assert_spiff_task_is_in_process(
"test_process_to_call_subprocess_script", "test_process_to_call_subprocess"
)
assert_spiff_task_is_in_process("top_level_subprocess_script", "top_level_subprocess")
assert_spiff_task_is_in_process("top_level_script", "top_level_process")
assert_spiff_task_is_in_process(spiff_task)
if spiff_task.task_spec.name == "top_level_call_activity":
# the task id / guid of the call activity gets used as the guid of the bpmn process that it calls
@ -513,7 +557,14 @@ class TestProcessInstanceProcessor(BaseTest):
assert direct_parent_process is not None
assert direct_parent_process.bpmn_process_definition.bpmn_identifier == "test_process_to_call"
assert processor.get_data() == fifth_data_set
for task_bpmn_identifier in expected_task_data.keys():
message = (
f"Expected to have seen a task with a bpmn_identifier of {task_bpmn_identifier} but did not. "
f"Only saw {sorted(spiff_tasks_checked)}"
)
assert task_bpmn_identifier in spiff_tasks_checked, message
assert processor.get_data() == sixth_data_set
def test_does_not_recreate_human_tasks_on_multiple_saves(
self,