some cleanup, updated SpiffWorkflow, and added some notes to pick back up where we left off with the more efficient method w/ burnettk

This commit is contained in:
jasquat 2023-04-06 10:57:52 -04:00
parent e16f40d13e
commit d6067d701f
No known key found for this signature in database
8 changed files with 55 additions and 56 deletions

View File

@ -1879,7 +1879,7 @@ description = "A workflow framework and BPMN/DMN Processor"
category = "main" category = "main"
optional = false optional = false
python-versions = "*" python-versions = "*"
develop = true develop = false
[package.dependencies] [package.dependencies]
celery = "*" celery = "*"
@ -1887,8 +1887,10 @@ configparser = "*"
lxml = "*" lxml = "*"
[package.source] [package.source]
type = "directory" type = "git"
url = "../../SpiffWorkflow" url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "96ad2a2b060deb445c39374f065690023351de19"
[[package]] [[package]]
name = "sqlalchemy" name = "sqlalchemy"
@ -2271,7 +2273,7 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = ">=3.9,<3.12" python-versions = ">=3.9,<3.12"
content-hash = "c4bb5e0ce1ad140b0e5b109ab3f9f136844815bd6db8200e546d44d533050612" content-hash = "9fea44386fbab29102a051a254058909568c4ee3dbd6a402fb91aacbcf1f7fd2"
[metadata.files] [metadata.files]
alabaster = [ alabaster = [

View File

@ -27,9 +27,9 @@ flask-marshmallow = "*"
flask-migrate = "*" flask-migrate = "*"
flask-restful = "*" flask-restful = "*"
werkzeug = "*" werkzeug = "*"
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"} # SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"}
SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" } # SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" }
sentry-sdk = "^1.10" sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0" sphinx-autoapi = "^2.0"
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"} flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}

View File

@ -136,17 +136,16 @@ def process_instance_run(
ErrorHandlingService().handle_error(processor, e) ErrorHandlingService().handle_error(processor, e)
raise e raise e
except Exception as e: except Exception as e:
raise e ErrorHandlingService().handle_error(processor, e)
# import pdb; pdb.set_trace() # FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes.
# ErrorHandlingService().handle_error(processor, e) # we need to recurse through all last tasks if the last task is a call activity or subprocess.
# # fixme: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes task = processor.bpmn_process_instance.last_task
# task = processor.bpmn_process_instance.last_task raise ApiError.from_task(
# raise ApiError.from_task( error_code="unknown_exception",
# error_code="unknown_exception", message=f"An unknown error occurred. Original error: {e}",
# message=f"An unknown error occurred. Original error: {e}", status_code=400,
# status_code=400, task=task,
# task=task, ) from e
# ) from e
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances() MessageService.correlate_all_message_instances()

View File

@ -159,7 +159,6 @@ class MessageService:
) -> None: ) -> None:
"""process_message_receive.""" """process_message_receive."""
processor_receive = ProcessInstanceProcessor(process_instance_receive) processor_receive = ProcessInstanceProcessor(process_instance_receive)
# import pdb; pdb.set_trace()
processor_receive.bpmn_process_instance.catch_bpmn_message(message_model_name, message_payload) processor_receive.bpmn_process_instance.catch_bpmn_message(message_model_name, message_payload)
processor_receive.do_engine_steps(save=True) processor_receive.do_engine_steps(save=True)
message_instance_receive.status = MessageStatuses.completed.value message_instance_receive.status = MessageStatuses.completed.value

View File

@ -354,9 +354,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
external_methods: Optional[Dict[str, Any]] = None, external_methods: Optional[Dict[str, Any]] = None,
) -> Any: ) -> Any:
"""_evaluate.""" """_evaluate."""
# if task.task_spec.name == 'passing_script_task':
# import pdb; pdb.set_trace()
# print("HEY2")
methods = self.__get_augment_methods(task) methods = self.__get_augment_methods(task)
if external_methods: if external_methods:
methods.update(external_methods) methods.update(external_methods)
@ -378,9 +375,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None: def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None:
"""Execute.""" """Execute."""
# if task.task_spec.name == 'passing_script_task':
# import pdb; pdb.set_trace()
# print("HEY3")
try: try:
# reset failing task just in case # reset failing task just in case
self.failing_spiff_task = None self.failing_spiff_task = None

View File

@ -134,10 +134,11 @@ class TaskService:
self.json_data_dicts.update(new_json_data_dicts) self.json_data_dicts.update(new_json_data_dicts)
# we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value # we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value
bpmn_process = new_bpmn_process or task_model.bpmn_process or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first() bpmn_process = (
# if bpmn_process is None: new_bpmn_process
# import pdb; pdb.set_trace() or task_model.bpmn_process
# print("HEY") or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first()
)
bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process( bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(
bpmn_process, spiff_task.workflow.data bpmn_process, spiff_task.workflow.data
@ -149,8 +150,8 @@ class TaskService:
self.update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts) self.update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts)
if start_and_end_times: if start_and_end_times:
task_model.start_in_seconds = start_and_end_times['start_in_seconds'] task_model.start_in_seconds = start_and_end_times["start_in_seconds"]
task_model.end_in_seconds = start_and_end_times['end_in_seconds'] task_model.end_in_seconds = start_and_end_times["end_in_seconds"]
if task_model.state == "COMPLETED" or task_failed: if task_model.state == "COMPLETED" or task_failed:
event_type = ProcessInstanceEventType.task_completed.value event_type = ProcessInstanceEventType.task_completed.value
@ -196,9 +197,6 @@ class TaskService:
direct_parent_bpmn_process = BpmnProcessModel.query.filter_by( direct_parent_bpmn_process = BpmnProcessModel.query.filter_by(
id=bpmn_process.direct_parent_process_id id=bpmn_process.direct_parent_process_id
).first() ).first()
# if direct_parent_bpmn_process is None:
# import pdb; pdb.set_trace()
# print("HEY22")
self.update_bpmn_process(spiff_workflow.outer_workflow, direct_parent_bpmn_process) self.update_bpmn_process(spiff_workflow.outer_workflow, direct_parent_bpmn_process)
@classmethod @classmethod
@ -460,7 +458,7 @@ class TaskService:
spiff_task_guid = str(spiff_task.id) spiff_task_guid = str(spiff_task.id)
if spiff_task_parent_guid in task_models: if spiff_task_parent_guid in task_models:
parent_task_model = task_models[spiff_task_parent_guid] parent_task_model = task_models[spiff_task_parent_guid]
if spiff_task_guid in parent_task_model.properties_json['children']: if spiff_task_guid in parent_task_model.properties_json["children"]:
new_parent_properties_json = copy.copy(parent_task_model.properties_json) new_parent_properties_json = copy.copy(parent_task_model.properties_json)
new_parent_properties_json["children"].remove(spiff_task_guid) new_parent_properties_json["children"].remove(spiff_task_guid)
parent_task_model.properties_json = new_parent_properties_json parent_task_model.properties_json = new_parent_properties_json

View File

@ -1,13 +1,12 @@
import time import time
from typing import Callable from typing import Callable
from typing import Set
import json
from typing import Optional from typing import Optional
from typing import Set
from uuid import UUID from uuid import UUID
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException, TaskNotFoundException # type: ignore from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
@ -18,13 +17,13 @@ from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel, MessageInstanceCorrelationRuleModel,
) )
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import TaskModel
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401 from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401
from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.process_instance_lock_service import ( from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService, ProcessInstanceLockService,
) )
from spiffworkflow_backend.services.task_service import StartAndEndTimes, TaskService from spiffworkflow_backend.services.task_service import StartAndEndTimes
from spiffworkflow_backend.services.task_service import TaskService
class EngineStepDelegate: class EngineStepDelegate:
@ -75,7 +74,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def will_complete_task(self, spiff_task: SpiffTask) -> None: def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model(): if self._should_update_task_model():
self.spiff_task_timestamps[spiff_task.id] = {'start_in_seconds': time.time(), 'end_in_seconds': None} self.spiff_task_timestamps[spiff_task.id] = {"start_in_seconds": time.time(), "end_in_seconds": None}
spiff_task.task_spec._predict(spiff_task, mask=TaskState.NOT_FINISHED_MASK) spiff_task.task_spec._predict(spiff_task, mask=TaskState.NOT_FINISHED_MASK)
self.current_task_start_in_seconds = time.time() self.current_task_start_in_seconds = time.time()
@ -85,17 +84,18 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def did_complete_task(self, spiff_task: SpiffTask) -> None: def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model(): if self._should_update_task_model():
# NOTE: used with process-all-tasks and process-children-of-last-task
task_model = self.task_service.update_task_model_with_spiff_task(spiff_task) task_model = self.task_service.update_task_model_with_spiff_task(spiff_task)
if self.current_task_start_in_seconds is None: if self.current_task_start_in_seconds is None:
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend") raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend")
task_model.start_in_seconds = self.current_task_start_in_seconds task_model.start_in_seconds = self.current_task_start_in_seconds
task_model.end_in_seconds = time.time() task_model.end_in_seconds = time.time()
self.spiff_task_timestamps[spiff_task.id]['end_in_seconds'] = time.time() # # NOTE: used with process-spiff-tasks-list
self.spiff_tasks_to_process.add(spiff_task.id) # self.spiff_task_timestamps[spiff_task.id]['end_in_seconds'] = time.time()
self._add_children(spiff_task) # self.spiff_tasks_to_process.add(spiff_task.id)
# self._add_parents(spiff_task) # self._add_children(spiff_task)
# # self._add_parents(spiff_task)
self.last_completed_spiff_task = spiff_task self.last_completed_spiff_task = spiff_task
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
@ -127,24 +127,27 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
if self._should_update_task_model(): if self._should_update_task_model():
# NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace
# excludes COMPLETED. the others were required to get PP1 to go to completion. # excludes COMPLETED. the others were required to get PP1 to go to completion.
# process FUTURE tasks because Boundary events are not processed otherwise. # process FUTURE tasks because Boundary events are not processed otherwise.
for waiting_spiff_task in bpmn_process_instance.get_tasks( for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY | TaskState.FUTURE TaskState.WAITING
# TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY | TaskState.CANCELLED
# TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.FUTURE | TaskState.READY
| TaskState.MAYBE
| TaskState.LIKELY
| TaskState.FUTURE
): ):
if waiting_spiff_task._has_state(TaskState.PREDICTED_MASK): if waiting_spiff_task._has_state(TaskState.PREDICTED_MASK):
TaskService.remove_spiff_task_from_parent(waiting_spiff_task, self.task_service.task_models) TaskService.remove_spiff_task_from_parent(waiting_spiff_task, self.task_service.task_models)
for cpt in waiting_spiff_task.parent.children:
if cpt.id == waiting_spiff_task.id:
waiting_spiff_task.parent.children.remove(cpt)
continue continue
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task) self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)
if self.last_completed_spiff_task is not None:
self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
# # NOTE: process-spiff-tasks-list: this would be the ideal way to handle all tasks
# # but we're missing something with it yet
# #
# # adding from line here until we are ready to go with this
# from SpiffWorkflow.exceptions import TaskNotFoundException
# for spiff_task_uuid in self.spiff_tasks_to_process: # for spiff_task_uuid in self.spiff_tasks_to_process:
# try: # try:
# waiting_spiff_task = bpmn_process_instance.get_task_from_id(spiff_task_uuid) # waiting_spiff_task = bpmn_process_instance.get_task_from_id(spiff_task_uuid)
@ -168,6 +171,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# if self.last_completed_spiff_task is not None: # if self.last_completed_spiff_task is not None:
# self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task) # self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
# # NOTE: process-children-of-last-task: this does not work with escalation boundary events
# if self.last_completed_spiff_task is not None: # if self.last_completed_spiff_task is not None:
# self.task_service.process_spiff_task_children(self.last_completed_spiff_task) # self.task_service.process_spiff_task_children(self.last_completed_spiff_task)
# self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task) # self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)

View File

@ -346,7 +346,6 @@ class TestProcessInstanceProcessor(BaseTest):
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
human_task_one = process_instance.active_human_tasks[0] human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
# import pdb; pdb.set_trace()
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
processor.suspend() processor.suspend()
@ -584,7 +583,11 @@ class TestProcessInstanceProcessor(BaseTest):
assert direct_parent_process.bpmn_process_definition.bpmn_identifier == "test_process_to_call" assert direct_parent_process.bpmn_process_definition.bpmn_identifier == "test_process_to_call"
spiff_tasks_checked.append(spiff_task.task_spec.name) spiff_tasks_checked.append(spiff_task.task_spec.name)
expected_task_identifiers = list(expected_task_data.keys()) + ['our_boundary_event', 'test_process_to_call_subprocess_script', 'top_level_call_activity'] expected_task_identifiers = list(expected_task_data.keys()) + [
"our_boundary_event",
"test_process_to_call_subprocess_script",
"top_level_call_activity",
]
for task_bpmn_identifier in expected_task_identifiers: for task_bpmn_identifier in expected_task_identifiers:
message = ( message = (
f"Expected to have seen a task with a bpmn_identifier of {task_bpmn_identifier} but did not. " f"Expected to have seen a task with a bpmn_identifier of {task_bpmn_identifier} but did not. "