Track spiff step details more granularly (#17)

This commit is contained in:
jbirddog 2022-11-03 15:07:46 -04:00 committed by GitHub
parent 55cb9f4fd0
commit a8e904c6b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 29 deletions

View File

@ -161,7 +161,9 @@ class BpmnWorkflow(Workflow):
event_definition.payload = payload
self.catch(event_definition, correlations=correlations)
def do_engine_steps(self, exit_at = None):
def do_engine_steps(self, exit_at = None,
will_complete_task=None,
did_complete_task=None):
"""
Execute any READY tasks that are engine specific (for example, gateways
or script tasks). This is done in a loop, so it will keep completing
@ -169,6 +171,8 @@ class BpmnWorkflow(Workflow):
left.
:param exit_at: After executing a task with a name matching this param return the task object
:param will_complete_task: Callback that will be called prior to completing a task
:param did_complete_task: Callback that will be called after completing a task
"""
assert not self.read_only
engine_steps = list(
@ -176,21 +180,34 @@ class BpmnWorkflow(Workflow):
if self._is_engine_task(t.task_spec)])
while engine_steps:
for task in engine_steps:
if will_complete_task is not None:
will_complete_task(task)
task.complete()
if did_complete_task is not None:
did_complete_task(task)
if task.task_spec.name == exit_at:
return task
engine_steps = list(
[t for t in self.get_tasks(TaskState.READY)
if self._is_engine_task(t.task_spec)])
def refresh_waiting_tasks(self):
def refresh_waiting_tasks(self,
will_refresh_task=None,
did_refresh_task=None):
"""
Refresh the state of all WAITING tasks. This will, for example, update
Catching Timer Events whose waiting time has passed.
:param will_refresh_task: Callback that will be called prior to refreshing a task
:param did_refresh_task: Callback that will be called after refreshing a task
"""
assert not self.read_only
for my_task in self.get_tasks(TaskState.WAITING):
if will_refresh_task is not None:
will_refresh_task(my_task)
my_task.task_spec._update(my_task)
if did_refresh_task is not None:
did_refresh_task(my_task)
def get_tasks_from_spec_name(self, name, workflow=None):
return [t for t in self.get_tasks(workflow=workflow) if t.task_spec.name == name]

View File

@ -1860,20 +1860,16 @@ description = "A workflow framework and BPMN/DMN Processor"
category = "main"
optional = false
python-versions = "*"
develop = false
develop = true
[package.dependencies]
celery = "*"
configparser = "*"
dateparser = "*"
lxml = "*"
pytz = "*"
[package.source]
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "a6392d19061f623394f5705fb78af23673d3940d"
type = "directory"
url = "../SpiffWorkflow"
[[package]]
name = "SQLAlchemy"
@ -2256,7 +2252,7 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools"
[metadata]
lock-version = "1.1"
python-versions = ">=3.9,<3.11"
content-hash = "995be3a9a60b515b281f017ff32ff27a52ca178b1980611b348dccac6afb6b89"
content-hash = "9562df87977dc1c9273401fa150dd26742a6786d5162ec3e71844482207b4fba"
[metadata.files]
alabaster = [
@ -2621,7 +2617,6 @@ greenlet = [
{file = "greenlet-1.1.3.post0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:0a954002064ee919b444b19c1185e8cce307a1f20600f47d6f4b6d336972c809"},
{file = "greenlet-1.1.3.post0-cp39-cp39-win32.whl", hash = "sha256:2ccdc818cc106cc238ff7eba0d71b9c77be868fdca31d6c3b1347a54c9b187b2"},
{file = "greenlet-1.1.3.post0-cp39-cp39-win_amd64.whl", hash = "sha256:91a84faf718e6f8b888ca63d0b2d6d185c8e2a198d2a7322d75c303e7097c8b7"},
{file = "greenlet-1.1.3.post0.tar.gz", hash = "sha256:f5e09dc5c6e1796969fd4b775ea1417d70e49a5df29aaa8e5d10675d9e11872c"},
]
gunicorn = [
{file = "gunicorn-20.1.0-py3-none-any.whl", hash = "sha256:9dcc4547dbb1cb284accfb15ab5667a0e5d1881cc443e0677b4882a4067a807e"},
@ -2946,10 +2941,7 @@ orjson = [
{file = "orjson-3.8.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b68a42a31f8429728183c21fb440c21de1b62e5378d0d73f280e2d894ef8942e"},
{file = "orjson-3.8.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:ff13410ddbdda5d4197a4a4c09969cb78c722a67550f0a63c02c07aadc624833"},
{file = "orjson-3.8.0-cp310-none-win_amd64.whl", hash = "sha256:2d81e6e56bbea44be0222fb53f7b255b4e7426290516771592738ca01dbd053b"},
{file = "orjson-3.8.0-cp311-cp311-macosx_10_7_x86_64.whl", hash = "sha256:200eae21c33f1f8b02a11f5d88d76950cd6fd986d88f1afe497a8ae2627c49aa"},
{file = "orjson-3.8.0-cp311-cp311-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:9529990f3eab54b976d327360aa1ff244a4b12cb5e4c5b3712fcdd96e8fe56d4"},
{file = "orjson-3.8.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e2defd9527651ad39ec20ae03c812adf47ef7662bdd6bc07dabb10888d70dc62"},
{file = "orjson-3.8.0-cp311-none-win_amd64.whl", hash = "sha256:b21c7af0ff6228ca7105f54f0800636eb49201133e15ddb80ac20c1ce973ef07"},
{file = "orjson-3.8.0-cp37-cp37m-macosx_10_7_x86_64.whl", hash = "sha256:9e6ac22cec72d5b39035b566e4b86c74b84866f12b5b0b6541506a080fb67d6d"},
{file = "orjson-3.8.0-cp37-cp37m-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:e2f4a5542f50e3d336a18cb224fc757245ca66b1fd0b70b5dd4471b8ff5f2b0e"},
{file = "orjson-3.8.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e1418feeb8b698b9224b1f024555895169d481604d5d884498c1838d7412794c"},
@ -3062,7 +3054,18 @@ py = [
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
]
pyasn1 = [
{file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"},
{file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"},
{file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"},
{file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"},
{file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"},
{file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"},
{file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"},
{file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"},
{file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"},
{file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"},
{file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"},
{file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"},
{file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"},
]
pycodestyle = [

View File

@ -27,8 +27,8 @@ flask-marshmallow = "*"
flask-migrate = "*"
flask-restful = "*"
werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
#SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
#SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0"
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}

View File

@ -557,10 +557,9 @@ class ProcessInstanceProcessor:
"lane_assignment_id": lane_assignment_id,
}
def save_spiff_step_details(self, bpmn_json: Optional[str]) -> None:
def save_spiff_step_details(self) -> None:
"""SaveSpiffStepDetails."""
if bpmn_json is None:
return
bpmn_json = self.serialize()
wf_json = json.loads(bpmn_json)
task_json = "{}"
if "tasks" in wf_json:
@ -979,11 +978,18 @@ class ProcessInstanceProcessor:
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps."""
self.increment_spiff_step()
try:
self.bpmn_process_instance.refresh_waiting_tasks()
self.bpmn_process_instance.do_engine_steps(exit_at=exit_at)
self.bpmn_process_instance.refresh_waiting_tasks(
will_refresh_task=lambda t: self.increment_spiff_step(),
did_refresh_task=lambda t: self.save_spiff_step_details(),
)
self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at,
will_complete_task=lambda t: self.increment_spiff_step(),
did_complete_task=lambda t: self.save_spiff_step_details(),
)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
@ -993,10 +999,6 @@ class ProcessInstanceProcessor:
finally:
if save:
self.save()
bpmn_json = self.process_instance_model.bpmn_json
else:
bpmn_json = self.serialize()
self.save_spiff_step_details(bpmn_json)
def cancel_notify(self) -> None:
"""Cancel_notify."""
@ -1009,6 +1011,7 @@ class ProcessInstanceProcessor:
# A little hackly, but make the bpmn_process_instance catch a cancel event.
bpmn_process_instance.signal("cancel") # generate a cancel signal.
bpmn_process_instance.catch(CancelEventDefinition())
# Due to this being static, can't save granular step details in this case
bpmn_process_instance.do_engine_steps()
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
@ -1109,8 +1112,7 @@ class ProcessInstanceProcessor:
"""Complete_task."""
self.increment_spiff_step()
self.bpmn_process_instance.complete_task_from_id(task.id)
bpmn_json = self.serialize()
self.save_spiff_step_details(bpmn_json)
self.save_spiff_step_details()
def get_data(self) -> dict[str, Any]:
"""Get_data."""