WIP - test is still passing, no longer processing all tasks w/ burnettk

This commit is contained in:
jasquat 2023-04-05 16:07:35 -04:00
parent 140761c198
commit c5b85fd404
5 changed files with 2773 additions and 23 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1875,11 +1875,11 @@ test = ["pytest"]
[[package]] [[package]]
name = "SpiffWorkflow" name = "SpiffWorkflow"
version = "1.2.1" version = "1.2.1"
description = "" description = "A workflow framework and BPMN/DMN Processor"
category = "main" category = "main"
optional = false optional = false
python-versions = "*" python-versions = "*"
develop = false develop = true
[package.dependencies] [package.dependencies]
celery = "*" celery = "*"
@ -1887,10 +1887,8 @@ configparser = "*"
lxml = "*" lxml = "*"
[package.source] [package.source]
type = "git" type = "directory"
url = "https://github.com/sartography/SpiffWorkflow" url = "../../SpiffWorkflow"
reference = "main"
resolved_reference = "e1add839ddf2512f27cd0afe681ff3e0460d6f7a"
[[package]] [[package]]
name = "sqlalchemy" name = "sqlalchemy"
@ -2273,7 +2271,7 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = ">=3.9,<3.12" python-versions = ">=3.9,<3.12"
content-hash = "9fea44386fbab29102a051a254058909568c4ee3dbd6a402fb91aacbcf1f7fd2" content-hash = "c4bb5e0ce1ad140b0e5b109ab3f9f136844815bd6db8200e546d44d533050612"
[metadata.files] [metadata.files]
alabaster = [ alabaster = [

View File

@ -27,9 +27,9 @@ flask-marshmallow = "*"
flask-migrate = "*" flask-migrate = "*"
flask-restful = "*" flask-restful = "*"
werkzeug = "*" werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} # SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"} # SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"}
# SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" } SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" }
sentry-sdk = "^1.10" sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0" sphinx-autoapi = "^2.0"
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"} flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}

View File

@ -136,15 +136,17 @@ def process_instance_run(
ErrorHandlingService().handle_error(processor, e) ErrorHandlingService().handle_error(processor, e)
raise e raise e
except Exception as e: except Exception as e:
ErrorHandlingService().handle_error(processor, e) raise e
# fixme: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes # import pdb; pdb.set_trace()
task = processor.bpmn_process_instance.last_task # ErrorHandlingService().handle_error(processor, e)
raise ApiError.from_task( # # fixme: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes
error_code="unknown_exception", # task = processor.bpmn_process_instance.last_task
message=f"An unknown error occurred. Original error: {e}", # raise ApiError.from_task(
status_code=400, # error_code="unknown_exception",
task=task, # message=f"An unknown error occurred. Original error: {e}",
) from e # status_code=400,
# task=task,
# ) from e
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances() MessageService.correlate_all_message_instances()

View File

@ -1,11 +1,13 @@
import time import time
from typing import Callable from typing import Callable
from typing import Set
import json import json
from typing import Optional from typing import Optional
from uuid import UUID
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore from SpiffWorkflow.exceptions import SpiffWorkflowException, TaskNotFoundException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
@ -63,6 +65,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.current_task_start_in_seconds: Optional[float] = None self.current_task_start_in_seconds: Optional[float] = None
self.last_completed_spiff_task: Optional[SpiffTask] = None self.last_completed_spiff_task: Optional[SpiffTask] = None
self.spiff_tasks_to_process: Set[UUID] = set()
self.task_service = TaskService( self.task_service = TaskService(
process_instance=self.process_instance, process_instance=self.process_instance,
@ -91,6 +94,10 @@ class TaskModelSavingDelegate(EngineStepDelegate):
task_model.start_in_seconds = self.current_task_start_in_seconds task_model.start_in_seconds = self.current_task_start_in_seconds
task_model.end_in_seconds = time.time() task_model.end_in_seconds = time.time()
self.last_completed_spiff_task = spiff_task self.last_completed_spiff_task = spiff_task
self.spiff_tasks_to_process.add(spiff_task.id)
self._add_children(spiff_task)
self._add_parents(spiff_task)
# self.task_service.process_spiff_task_parent_subprocess_tasks(spiff_task) # self.task_service.process_spiff_task_parent_subprocess_tasks(spiff_task)
# self.task_service.process_spiff_task_children(spiff_task) # self.task_service.process_spiff_task_children(spiff_task)
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
@ -110,18 +117,47 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False) self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False)
db.session.commit() db.session.commit()
def _add_children(self, spiff_task: SpiffTask) -> None:
for child_spiff_task in spiff_task.children:
self.spiff_tasks_to_process.add(child_spiff_task.id)
self._add_children(child_spiff_task)
def _add_parents(self, spiff_task: SpiffTask) -> None:
if spiff_task.parent and spiff_task.parent.task_spec.name != "Root":
self.spiff_tasks_to_process.add(spiff_task.parent.id)
self._add_parents(spiff_task.parent)
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
if self._should_update_task_model(): if self._should_update_task_model():
# excludes COMPLETED. the others were required to get PP1 to go to completion. # excludes COMPLETED. the others were required to get PP1 to go to completion.
# process FUTURE tasks because Boundary events are not processed otherwise. # process FUTURE tasks because Boundary events are not processed otherwise.
for waiting_spiff_task in bpmn_process_instance.get_tasks( # for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY | TaskState.FUTURE # TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY | TaskState.FUTURE
): # ):
for spiff_task_guid in self.spiff_tasks_to_process:
if spiff_task_guid is None:
continue
try:
print(f"spiff_task_guid: {spiff_task_guid}")
waiting_spiff_task = bpmn_process_instance.get_task_from_id(spiff_task_guid)
except TaskNotFoundException:
continue
# if waiting_spiff_task.task_spec.name == 'top_level_manual_task_two':
# import pdb; pdb.set_trace()
# print("HEY42")
# include PREDICTED_MASK tasks in list so we can remove them from the parent # include PREDICTED_MASK tasks in list so we can remove them from the parent
if waiting_spiff_task._has_state(TaskState.PREDICTED_MASK): if waiting_spiff_task._has_state(TaskState.PREDICTED_MASK):
TaskService.remove_spiff_task_from_parent(waiting_spiff_task, self.task_service.task_models) TaskService.remove_spiff_task_from_parent(waiting_spiff_task, self.task_service.task_models)
for cpt in waiting_spiff_task.parent.children:
if cpt.id == waiting_spiff_task.id:
waiting_spiff_task.parent.children.remove(cpt)
continue continue
try:
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task) self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)
except Exception as ex:
import pdb; pdb.set_trace()
print("HEY16")
# self.task_service.process_spiff_task_parent_subprocess_tasks(waiting_spiff_task)
# if self.last_completed_spiff_task is not None: # if self.last_completed_spiff_task is not None:
# self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task) # self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
@ -250,6 +286,7 @@ class WorkflowExecutionService:
self.process_bpmn_messages() self.process_bpmn_messages()
self.queue_waiting_receive_messages() self.queue_waiting_receive_messages()
except SpiffWorkflowException as swe: except SpiffWorkflowException as swe:
raise swe
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally: finally: