using spiffworkflow run-boundary-events-from-engine-steps branch and tests pass w/ burnettk

This commit is contained in:
jasquat 2023-04-10 12:22:33 -04:00
parent f6bcf130e7
commit 5c2f0ef9de
5 changed files with 45 additions and 21 deletions

View File

@ -1889,8 +1889,8 @@ lxml = "*"
[package.source] [package.source]
type = "git" type = "git"
url = "https://github.com/sartography/SpiffWorkflow" url = "https://github.com/sartography/SpiffWorkflow"
reference = "main" reference = "bugfix/run-boundary-events-from-engine-steps"
resolved_reference = "1c877dd768053b4cce4c4e14c92caa3216371751" resolved_reference = "067d6a723b2533ba51d83f94f7f3609004474673"
[[package]] [[package]]
name = "sqlalchemy" name = "sqlalchemy"
@ -2273,7 +2273,7 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = ">=3.9,<3.12" python-versions = ">=3.9,<3.12"
content-hash = "9fea44386fbab29102a051a254058909568c4ee3dbd6a402fb91aacbcf1f7fd2" content-hash = "3f192bd96668c070edc1270804ba560b7e0df3df09b7f23f2651b2dcd68c1500"
[metadata.files] [metadata.files]
alabaster = [ alabaster = [

View File

@ -27,7 +27,7 @@ flask-marshmallow = "*"
flask-migrate = "*" flask-migrate = "*"
flask-restful = "*" flask-restful = "*"
werkzeug = "*" werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "bugfix/run-boundary-events-from-engine-steps"}
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"} # SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"}
# SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" } # SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" }
sentry-sdk = "^1.10" sentry-sdk = "^1.10"

View File

@ -1649,7 +1649,7 @@ class ProcessInstanceProcessor:
self.save, self.save,
) )
try: try:
execution_service.do_engine_steps(exit_at, save) execution_service.run_until_user_input_required_and_save(exit_at, save)
finally: finally:
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the # clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
# script engine on a class variable. # script engine on a class variable.
@ -1839,7 +1839,7 @@ class ProcessInstanceProcessor:
Return either the most recent task data or--if the process instance is complete-- Return either the most recent task data or--if the process instance is complete--
the process data. the process data.
""" """
if self.process_instance_model.status == "complete": if self.process_instance_model.status == ProcessInstanceStatus.complete.value:
return self.get_data() return self.get_data()
most_recent_task = None most_recent_task = None

View File

@ -192,7 +192,7 @@ class ExecutionStrategy:
"""__init__.""" """__init__."""
self.delegate = delegate self.delegate = delegate
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
pass pass
def save(self, bpmn_process_instance: BpmnWorkflow) -> None: def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
@ -202,7 +202,7 @@ class ExecutionStrategy:
class GreedyExecutionStrategy(ExecutionStrategy): class GreedyExecutionStrategy(ExecutionStrategy):
"""The common execution strategy. This will greedily run all engine steps without stopping.""" """The common execution strategy. This will greedily run all engine steps without stopping."""
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance self.bpmn_process_instance = bpmn_process_instance
bpmn_process_instance.do_engine_steps( bpmn_process_instance.do_engine_steps(
exit_at=exit_at, exit_at=exit_at,
@ -219,7 +219,7 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
return (to an interstitial page). The background processor would then take over. return (to an interstitial page). The background processor would then take over.
""" """
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance self.bpmn_process_instance = bpmn_process_instance
engine_steps = list( engine_steps = list(
[ [
@ -278,7 +278,13 @@ class WorkflowExecutionService:
self.process_instance_completer = process_instance_completer self.process_instance_completer = process_instance_completer
self.process_instance_saver = process_instance_saver self.process_instance_saver = process_instance_saver
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: # names of methods that do spiff stuff:
# processor.do_engine_steps calls:
# run_until_user_input_required_and_save
# run_until_user_input_required
# execution_strategy.spiff_run
# spiff.do_engine_steps
def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps.""" """Do_engine_steps."""
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped: with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
if tripped: if tripped:
@ -288,16 +294,8 @@ class WorkflowExecutionService:
) )
try: try:
self.bpmn_process_instance.refresh_waiting_tasks() self.run_until_user_input_required(exit_at)
# TODO: implicit re-entrant locks here `with_dequeued`
self.execution_strategy.do_engine_steps(self.bpmn_process_instance, exit_at)
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except SpiffWorkflowException as swe: except SpiffWorkflowException as swe:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
@ -308,6 +306,31 @@ class WorkflowExecutionService:
if save: if save:
self.process_instance_saver() self.process_instance_saver()
def run_until_user_input_required(self, exit_at: None = None) -> None:
"""Keeps running tasks until there are no non-human READY tasks.
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
"""
self.bpmn_process_instance.refresh_waiting_tasks()
# TODO: implicit re-entrant locks here `with_dequeued`
self.execution_strategy.spiff_run(self.bpmn_process_instance, exit_at)
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
self.bpmn_process_instance.refresh_waiting_tasks()
ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY)
non_human_waiting_task = next(
(p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None
)
# non_human_waiting_task = next((p for p in ready_tasks), None)
if non_human_waiting_task is not None:
self.run_until_user_input_required(exit_at)
def process_bpmn_messages(self) -> None: def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages.""" """Process_bpmn_messages."""
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages() bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
@ -379,11 +402,11 @@ class WorkflowExecutionService:
class ProfiledWorkflowExecutionService(WorkflowExecutionService): class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service.""" """A profiled version of the workflow execution service."""
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps.""" """__do_engine_steps."""
import cProfile import cProfile
from pstats import SortKey from pstats import SortKey
with cProfile.Profile() as pr: with cProfile.Profile() as pr:
super().do_engine_steps(exit_at=exit_at, save=save) super().run_until_user_input_required_and_save(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE) pr.print_stats(sort=SortKey.CUMULATIVE)

View File

@ -1,4 +1,5 @@
"""Process Model.""" """Process Model."""
import json
import re import re
from flask.app import Flask from flask.app import Flask