From 9313a9f73a6d8653aa0805d09af01afee34a1a2e Mon Sep 17 00:00:00 2001 From: Kevin Burnett <18027+burnettk@users.noreply.github.com> Date: Thu, 10 Aug 2023 05:54:49 -0700 Subject: [PATCH] Feature/event payloads part 2 (#401) * Revert "Revert "Feature/event payloads (#393)"" This reverts commit 95fafb7af118cbe81ca20600bbb83e54e0936a5a. * Revert "Revert "poet not available in container"" This reverts commit 140220498c284163dc02f8075fac949dff4de9e5. * Revert "Revert "Run event payloads data migration from background processor (#399)"" This reverts commit 2afced3a51cda18491bc23b344bf2bada41393d5. * Revert "Revert "using new spiff api to get info about events. w/ elizabeth"" This reverts commit af857fee229fc89824e45a5d36ab0178e284ed44. * Revert "Revert "fix tests for waiting_event_can_be_skipped"" This reverts commit 886e6bd42a94390bf4d863ec79bff0a3831f6fcf. * push image for preview env * default scripts to localhost w/ burnettk * use the bugfix/update-split-task-inputs spiffworkflow branch w/ burnettk * removed debug json files * use main for spiffworkflow * do not attempt to highlight non-diagram boundary items w/ burnettk * updated SpiffWorkflow to fix multiple signal event issue w/ burnettk --------- Co-authored-by: burnettk Co-authored-by: jasquat --- .../docker_image_for_main_builds.yml | 1 + bin/run_pyl | 2 +- .../bin/boot_server_in_docker | 4 + .../bin/data_migrations/version_1_3.py | 21 ++ spiffworkflow-backend/bin/get_token | 4 +- .../bin/login_with_user_list | 6 +- .../bin/start_blocking_appscheduler.py | 15 +- .../migrations/versions/214e0c5fb418_.py | 32 +++ spiffworkflow-backend/poetry.lock | 2 +- .../data_migrations/version_1_3.py | 195 ++++++++++++++++++ .../models/message_instance_correlation.py | 3 + .../services/message_service.py | 35 +++- .../services/process_instance_processor.py | 9 +- .../services/process_instance_service.py | 13 +- .../services/workflow_execution_service.py | 24 ++- .../specs/start_event.py | 12 +- .../test_process_to_call.bpmn | 15 +- .../integration/test_process_api.py | 2 - .../unit/test_process_instance_service.py | 33 +-- spiffworkflow-frontend/package-lock.json | 2 +- .../src/components/ReactDiagramEditor.tsx | 4 +- 21 files changed, 369 insertions(+), 65 deletions(-) create mode 100644 spiffworkflow-backend/bin/data_migrations/version_1_3.py create mode 100644 spiffworkflow-backend/migrations/versions/214e0c5fb418_.py create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_1_3.py diff --git a/.github/workflows/docker_image_for_main_builds.yml b/.github/workflows/docker_image_for_main_builds.yml index 5054a003..640be46e 100644 --- a/.github/workflows/docker_image_for_main_builds.yml +++ b/.github/workflows/docker_image_for_main_builds.yml @@ -31,6 +31,7 @@ on: push: branches: - main + - feature/event-payloads-part-2 - spiffdemo jobs: diff --git a/bin/run_pyl b/bin/run_pyl index 57622a35..dfec03de 100755 --- a/bin/run_pyl +++ b/bin/run_pyl @@ -38,7 +38,7 @@ function run_autofixers() { asdf reshim python fi - python_dirs=$(get_python_dirs) + python_dirs="$(get_python_dirs) bin" ruff --fix $python_dirs || echo '' } diff --git a/spiffworkflow-backend/bin/boot_server_in_docker b/spiffworkflow-backend/bin/boot_server_in_docker index 96d3d79f..2e5c0444 100755 --- a/spiffworkflow-backend/bin/boot_server_in_docker +++ b/spiffworkflow-backend/bin/boot_server_in_docker @@ -88,6 +88,10 @@ if [[ -z "${SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER:-}" ]]; then export SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER fi +# DELETE after this runs on all necessary environments +# TODO: make a system somewhat like schema migrations (storing versions in a db table) to handle data migrations +poetry run python ./bin/data_migrations/version_1_3.py + # --worker-class is not strictly necessary, since setting threads will automatically set the worker class to gthread, but meh export IS_GUNICORN="true" # THIS MUST BE THE LAST COMMAND! diff --git a/spiffworkflow-backend/bin/data_migrations/version_1_3.py b/spiffworkflow-backend/bin/data_migrations/version_1_3.py new file mode 100644 index 00000000..64324447 --- /dev/null +++ b/spiffworkflow-backend/bin/data_migrations/version_1_3.py @@ -0,0 +1,21 @@ +import time + +from spiffworkflow_backend import create_app +from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree + + +def main() -> None: + app = create_app() + start_time = time.time() + + with app.app_context(): + VersionOneThree().run() + + end_time = time.time() + print( + f"done running data migration from ./bin/data_migrations/version_1_3.py. took {end_time - start_time} seconds" + ) + + +if __name__ == "__main__": + main() diff --git a/spiffworkflow-backend/bin/get_token b/spiffworkflow-backend/bin/get_token index 3a0f74e0..ab8ef6c5 100755 --- a/spiffworkflow-backend/bin/get_token +++ b/spiffworkflow-backend/bin/get_token @@ -21,11 +21,9 @@ set -o errtrace -o errexit -o nounset -o pipefail # ./bin/get_token ciadmin1 ciadmin1 '%2Fprocess-models' if [[ -z "${BACKEND_BASE_URL:-}" ]]; then - # BACKEND_BASE_URL=http://localhost:7000 - BACKEND_BASE_URL=https://api.dev.spiffworkflow.org + BACKEND_BASE_URL=http://localhost:7000 fi if [[ -z "${KEYCLOAK_BASE_URL:-}" ]]; then - # KEYCLOAK_BASE_URL=http://localhost:7002 if grep -qE "spiffworkflow.org" <<<"$BACKEND_BASE_URL" ; then env_domain=$(hot_sed -E 's/.*api\.(\w+\.spiffworkflow.org).*/\1/' <<<"${BACKEND_BASE_URL}") KEYCLOAK_BASE_URL="https://keycloak.${env_domain}" diff --git a/spiffworkflow-backend/bin/login_with_user_list b/spiffworkflow-backend/bin/login_with_user_list index 40192577..6dc66241 100755 --- a/spiffworkflow-backend/bin/login_with_user_list +++ b/spiffworkflow-backend/bin/login_with_user_list @@ -10,12 +10,10 @@ set -o errtrace -o errexit -o nounset -o pipefail script_dir="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" if [[ -z "${KEYCLOAK_BASE_URL:-}" ]]; then - # export KEYCLOAK_BASE_URL=http://localhost:7002 - export KEYCLOAK_BASE_URL=https://keycloak.dev.spiffworkflow.org + export KEYCLOAK_BASE_URL=http://localhost:7002 fi if [[ -z "${BACKEND_BASE_URL:-}" ]]; then - # export BACKEND_BASE_URL=http://localhost:7000 - export BACKEND_BASE_URL=https://api.dev.spiffworkflow.org + export BACKEND_BASE_URL=http://localhost:7000 fi user_list="${1}" diff --git a/spiffworkflow-backend/bin/start_blocking_appscheduler.py b/spiffworkflow-backend/bin/start_blocking_appscheduler.py index 61b753f2..89a40b44 100755 --- a/spiffworkflow-backend/bin/start_blocking_appscheduler.py +++ b/spiffworkflow-backend/bin/start_blocking_appscheduler.py @@ -4,16 +4,29 @@ import time from apscheduler.schedulers.background import BlockingScheduler # type: ignore from spiffworkflow_backend import create_app from spiffworkflow_backend import start_scheduler +from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree from spiffworkflow_backend.helpers.db_helper import try_to_connect def main() -> None: - """Main.""" + seconds_to_wait = 300 + print(f"sleeping for {seconds_to_wait} seconds to give the api container time to run the migration") + time.sleep(seconds_to_wait) + print("done sleeping") + + print("running data migration from background processor") app = create_app() start_time = time.time() + with app.app_context(): try_to_connect(start_time) + VersionOneThree().run() + end_time = time.time() + print( + f"done running data migration from background processor. took {end_time - start_time} seconds. starting" + " scheduler" + ) start_scheduler(app, BlockingScheduler) diff --git a/spiffworkflow-backend/migrations/versions/214e0c5fb418_.py b/spiffworkflow-backend/migrations/versions/214e0c5fb418_.py new file mode 100644 index 00000000..ad17339d --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/214e0c5fb418_.py @@ -0,0 +1,32 @@ +"""empty message + +Revision ID: 214e0c5fb418 +Revises: 64adf34a98db +Create Date: 2023-07-11 14:52:15.825136 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '214e0c5fb418' +down_revision = '64adf34a98db' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op: + batch_op.add_column(sa.Column('correlation_key_names', sa.JSON(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op: + batch_op.drop_column('correlation_key_names') + + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index 6dbfde2c..506d1a7a 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -2285,7 +2285,7 @@ lxml = "*" type = "git" url = "https://github.com/sartography/SpiffWorkflow" reference = "main" -resolved_reference = "9bd018e596d42b897da67ff894f458d6bc40bdf9" +resolved_reference = "0adfc8cbaec80d36f98a4136434e960f666fcfe2" [[package]] name = "sqlalchemy" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_1_3.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_1_3.py new file mode 100644 index 00000000..444aeb83 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_1_3.py @@ -0,0 +1,195 @@ +import copy +import json +import uuid +from hashlib import sha256 + +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.task import Task +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +from spiffworkflow_backend.models.task_definition import TaskDefinitionModel +from sqlalchemy import or_ +from sqlalchemy.orm.attributes import flag_modified + + +class VersionOneThree: + """Migrates data in the database to be compatible with SpiffWorkflow at git revision ebcdde95. + + Converts migration file from SpiffWorkflow to work with backend's db: + https://github.com/sartography/SpiffWorkflow/blob/main/SpiffWorkflow/bpmn/serializer/migration/version_1_3.py + """ + + def run(self) -> None: + print("start VersionOneThree.run") + task_definitions = self.get_relevant_task_definitions() + print(f"found relevant task_definitions: {len(task_definitions)}") + for task_definition in task_definitions: + self.process_task_definition(task_definition) + relating_task_models = TaskModel.query.filter_by(task_definition_id=task_definition.id).all() + for task_model in relating_task_models: + self.process_task_model(task_model, task_definition) + + task_definitions_with_events = TaskDefinitionModel.query.filter( + or_( + TaskDefinitionModel.typename.like("%Event%"), # type: ignore + TaskDefinitionModel.typename.in_(["SendTask", "ReceiveTask"]), # type: ignore + ) + ).all() + for tdwe in task_definitions_with_events: + self.update_event_definitions(tdwe) + + db.session.commit() + print("end VersionOneThree.run") + + def get_relevant_task_definitions(self) -> list[TaskDefinitionModel]: + task_definitions: list[TaskDefinitionModel] = TaskDefinitionModel.query.filter_by( + typename="_BoundaryEventParent" + ).all() + return task_definitions + + def process_task_definition(self, task_definition: TaskDefinitionModel) -> None: + task_definition.typename = "BoundaryEventSplit" + task_definition.bpmn_identifier = task_definition.bpmn_identifier.replace( + "BoundaryEventParent", "BoundaryEventSplit" + ) + + properties_json = copy.copy(task_definition.properties_json) + properties_json.pop("main_child_task_spec") + properties_json["typename"] = task_definition.typename + properties_json["name"] = task_definition.bpmn_identifier + task_definition.properties_json = properties_json + flag_modified(task_definition, "properties_json") # type: ignore + db.session.add(task_definition) + + join_properties_json = { + "name": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"), + "manual": False, + "bpmn_id": None, + "lookahead": 2, + "inputs": properties_json["outputs"], + "outputs": [], + "split_task": task_definition.bpmn_identifier, + "threshold": None, + "cancel": True, + "typename": "BoundaryEventJoin", + } + + join_task_definition = TaskDefinitionModel( + bpmn_process_definition_id=task_definition.bpmn_process_definition_id, + bpmn_identifier=join_properties_json["name"], + typename=join_properties_json["typename"], + properties_json=join_properties_json, + ) + db.session.add(join_task_definition) + + for parent_bpmn_identifier in properties_json["inputs"]: + parent_task_definition = TaskDefinitionModel.query.filter_by( + bpmn_identifier=parent_bpmn_identifier, + bpmn_process_definition_id=task_definition.bpmn_process_definition_id, + ).first() + parent_task_definition.properties_json["outputs"] = [ + name.replace("BoundaryEventParent", "BoundaryEventSplit") + for name in parent_task_definition.properties_json["outputs"] + ] + flag_modified(parent_task_definition, "properties_json") # type: ignore + db.session.add(parent_task_definition) + + for child_bpmn_identifier in properties_json["outputs"]: + child_task_definition = TaskDefinitionModel.query.filter_by( + bpmn_identifier=child_bpmn_identifier, + bpmn_process_definition_id=task_definition.bpmn_process_definition_id, + ).first() + child_task_definition.properties_json["outputs"].append(join_task_definition.bpmn_identifier) + child_task_definition.properties_json["inputs"] = [ + name.replace("BoundaryEventParent", "BoundaryEventSplit") + for name in child_task_definition.properties_json["inputs"] + ] + flag_modified(child_task_definition, "properties_json") # type: ignore + db.session.add(child_task_definition) + + def process_task_model(self, task_model: TaskModel, task_definition: TaskDefinitionModel) -> None: + task_model.properties_json["task_spec"] = task_definition.bpmn_identifier + flag_modified(task_model, "properties_json") # type: ignore + db.session.add(task_model) + + child_task_models = [] + all_children_completed = True + + # Ruff keeps complaining unless it's done like this + blank_json = json.dumps({}) + blank_json_data_hash = sha256(blank_json.encode("utf8")).hexdigest() + + for child_task_guid in task_model.properties_json["children"]: + child_task_model = TaskModel.query.filter_by(guid=child_task_guid).first() + if child_task_model is None: + continue + if child_task_model.state not in ["COMPLETED", "CANCELLED"]: + all_children_completed = False + child_task_models.append(child_task_model) + + for child_task_model in child_task_models: + if child_task_model.state == "CANCELLED": + # Cancelled tasks don't have children + continue + + new_task_state = None + start_in_seconds = child_task_model.start_in_seconds + end_in_seconds = None + + if child_task_model.state in ["MAYBE", "LIKELY", "FUTURE"]: + new_task_state = child_task_model.state + elif child_task_model.state in ["WAITING", "READY", "STARTED"]: + new_task_state = "FUTURE" + elif child_task_model.state == "COMPLETED": + if all_children_completed: + new_task_state = "COMPLETED" + end_in_seconds = child_task_model.end_in_seconds + else: + new_task_state = "WAITING" + elif child_task_model.state == "ERROR": + new_task_state = "WAITING" + else: + raise Exception(f"Unknown state: {child_task_model.state} for {child_task_model.guild}") + + new_task_properties_json = { + "id": str(uuid.uuid4()), + "parent": child_task_model.guid, + "children": [], + "state": Task.task_state_name_to_int(new_task_state), + "task_spec": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"), + "last_state_change": None, + "triggered": False, + "internal_data": {}, + } + + new_task_model = TaskModel( + guid=new_task_properties_json["id"], + bpmn_process_id=task_model.bpmn_process_id, + process_instance_id=task_model.process_instance_id, + task_definition_id=task_model.task_definition_id, + state=new_task_state, + properties_json=new_task_properties_json, + start_in_seconds=start_in_seconds, + end_in_seconds=end_in_seconds, + json_data_hash=blank_json_data_hash, + python_env_data_hash=blank_json_data_hash, + ) + db.session.add(new_task_model) + + child_task_model.properties_json["children"].append(new_task_model.guid) + flag_modified(child_task_model, "properties_json") # type: ignore + db.session.add(child_task_model) + + def update_event_definitions(self, task_definition: TaskDefinitionModel) -> None: + if "event_definition" in task_definition.properties_json: + properties_json = copy.copy(task_definition.properties_json) + properties_json["event_definition"].pop("internal", None) + properties_json["event_definition"].pop("external", None) + if "escalation_code" in properties_json["event_definition"]: + properties_json["event_definition"]["code"] = properties_json["event_definition"].pop( + "escalation_code" + ) + if "error_code" in properties_json["event_definition"]: + properties_json["event_definition"]["code"] = properties_json["event_definition"].pop("error_code") + task_definition.properties_json = properties_json + flag_modified(task_definition, "properties_json") # type: ignore + db.session.add(task_definition) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py index a8b062aa..38f13557 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py @@ -29,8 +29,11 @@ class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel): id = db.Column(db.Integer, primary_key=True) message_instance_id = db.Column(ForeignKey(MessageInstanceModel.id), nullable=False, index=True) # type: ignore + name: str = db.Column(db.String(50), nullable=False, index=True) retrieval_expression: str = db.Column(db.String(255)) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) + correlation_key_names: list = db.Column(db.JSON) + message_instance = relationship("MessageInstanceModel", back_populates="correlation_rules") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index 9b17176e..685f1f4a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -1,4 +1,6 @@ -from SpiffWorkflow.bpmn.workflow import BpmnMessage # type: ignore +from SpiffWorkflow.bpmn.event import BpmnEvent # type: ignore +from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty # type: ignore +from SpiffWorkflow.spiff.specs.event_definitions import MessageEventDefinition # type: ignore from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageStatuses @@ -72,8 +74,7 @@ class MessageService: cls.process_message_receive( receiving_process, message_instance_receive, - message_instance_send.name, - message_instance_send.payload, + message_instance_send, ) message_instance_receive.status = "completed" message_instance_receive.counterpart_id = message_instance_send.id @@ -130,7 +131,7 @@ class MessageService: ( ( "Process instance cannot be found for queued message:" - f" {message_instance_receive.id}.Tried with id" + f" {message_instance_receive.id}. Tried with id" f" {message_instance_receive.process_instance_id}" ), ) @@ -141,16 +142,28 @@ class MessageService: def process_message_receive( process_instance_receive: ProcessInstanceModel, message_instance_receive: MessageInstanceModel, - message_model_name: str, - message_payload: dict, + message_instance_send: MessageInstanceModel, ) -> None: - bpmn_message = BpmnMessage( - None, - message_model_name, - message_payload, + correlation_properties = [] + for cr in message_instance_receive.correlation_rules: + correlation_properties.append( + CorrelationProperty( + name=cr.name, + retrieval_expression=cr.retrieval_expression, + correlation_keys=cr.correlation_key_names, + ) + ) + bpmn_message = MessageEventDefinition( + name=message_instance_send.name, + correlation_properties=correlation_properties, + ) + bpmn_event = BpmnEvent( + event_definition=bpmn_message, + payload=message_instance_send.payload, + correlations=message_instance_send.correlation_keys, ) processor_receive = ProcessInstanceProcessor(process_instance_receive) - processor_receive.bpmn_process_instance.catch_bpmn_message(bpmn_message) + processor_receive.bpmn_process_instance.send_event(bpmn_event) processor_receive.do_engine_steps(save=True) message_instance_receive.status = MessageStatuses.completed.value db.session.add(message_instance_receive) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 4d9e6536..7cb6d474 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -24,6 +24,7 @@ from flask import current_app from lxml import etree # type: ignore from lxml.etree import XMLSyntaxError # type: ignore from RestrictedPython import safe_globals # type: ignore +from SpiffWorkflow.bpmn.event import BpmnEvent # type: ignore from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore @@ -1089,10 +1090,12 @@ class ProcessInstanceProcessor: """Send an event to the workflow.""" payload = event_data.pop("payload", None) event_definition = self._event_serializer.registry.restore(event_data) - if payload is not None: - event_definition.payload = payload + bpmn_event = BpmnEvent( + event_definition=event_definition, + payload=payload, + ) try: - self.bpmn_process_instance.catch(event_definition) + self.bpmn_process_instance.send_event(bpmn_event) except Exception as e: print(e) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 9b84649e..36395588 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -10,8 +10,9 @@ from urllib.parse import unquote import sentry_sdk from flask import current_app from flask import g -from SpiffWorkflow.bpmn.specs.control import _BoundaryEventParent # type: ignore -from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition # type: ignore +from SpiffWorkflow.bpmn.event import PendingBpmnEvent # type: ignore +from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit # type: ignore +from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from spiffworkflow_backend import db from spiffworkflow_backend.exceptions.api_error import ApiError @@ -174,16 +175,16 @@ class ProcessInstanceService: db.session.commit() @classmethod - def waiting_event_can_be_skipped(cls, waiting_event: dict[str, Any], now_in_utc: datetime) -> bool: + def waiting_event_can_be_skipped(cls, waiting_event: PendingBpmnEvent, now_in_utc: datetime) -> bool: # # over time this function can gain more knowledge of different event types, # for now we are just handling Duration Timer events. # # example: {'event_type': 'Duration Timer', 'name': None, 'value': '2023-04-27T20:15:10.626656+00:00'} # - spiff_event_type = waiting_event.get("event_type") + spiff_event_type = waiting_event.event_type if spiff_event_type == "DurationTimerEventDefinition": - event_value = waiting_event.get("value") + event_value = waiting_event.value if event_value is not None: event_datetime = TimerEventDefinition.get_datetime(event_value) return event_datetime > now_in_utc # type: ignore @@ -199,7 +200,7 @@ class ProcessInstanceService: @classmethod def ready_user_task_has_associated_timer(cls, processor: ProcessInstanceProcessor) -> bool: for ready_user_task in processor.bpmn_process_instance.get_ready_user_tasks(): - if isinstance(ready_user_task.parent.task_spec, _BoundaryEventParent): + if isinstance(ready_user_task.parent.task_spec, BoundaryEventSplit): return True return False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 9440fe26..bb16108e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -9,6 +9,7 @@ from uuid import UUID from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore +from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore @@ -432,15 +433,21 @@ class WorkflowExecutionService: self.process_instance_saver() def process_bpmn_messages(self) -> None: - bpmn_messages = self.bpmn_process_instance.get_bpmn_messages() - for bpmn_message in bpmn_messages: + # FIXE: get_events clears out the events so if we have other events we care about + # this will clear them out as well. + # Right now we only care about messages though. + bpmn_events = self.bpmn_process_instance.get_events() + for bpmn_event in bpmn_events: + if not isinstance(bpmn_event.event_definition, MessageEventDefinition): + continue + bpmn_message = bpmn_event.event_definition message_instance = MessageInstanceModel( process_instance_id=self.process_instance_model.id, user_id=self.process_instance_model.process_initiator_id, # TODO: use the correct swimlane user when that is set up message_type="send", name=bpmn_message.name, - payload=bpmn_message.payload, + payload=bpmn_event.payload, correlation_keys=self.bpmn_process_instance.correlations, ) db.session.add(message_instance) @@ -450,21 +457,21 @@ class WorkflowExecutionService: bpmn_process_correlations = self.bpmn_process_instance.correlations bpmn_process.properties_json["correlations"] = bpmn_process_correlations # update correlations correctly but always null out bpmn_messages since they get cleared out later - bpmn_process.properties_json["bpmn_messages"] = [] + bpmn_process.properties_json["bpmn_events"] = [] db.session.add(bpmn_process) db.session.commit() def queue_waiting_receive_messages(self) -> None: waiting_events = self.bpmn_process_instance.waiting_events() - waiting_message_events = filter(lambda e: e["event_type"] == "MessageEventDefinition", waiting_events) + waiting_message_events = filter(lambda e: e.event_type == "MessageEventDefinition", waiting_events) for event in waiting_message_events: # Ensure we are only creating one message instance for each waiting message if ( MessageInstanceModel.query.filter_by( process_instance_id=self.process_instance_model.id, message_type="receive", - name=event["name"], + name=event.name, ).count() > 0 ): @@ -475,14 +482,15 @@ class WorkflowExecutionService: process_instance_id=self.process_instance_model.id, user_id=self.process_instance_model.process_initiator_id, message_type="receive", - name=event["name"], + name=event.name, correlation_keys=self.bpmn_process_instance.correlations, ) - for correlation_property in event["value"]: + for correlation_property in event.value: message_correlation = MessageInstanceCorrelationRuleModel( message_instance=message_instance, name=correlation_property.name, retrieval_expression=correlation_property.retrieval_expression, + correlation_key_names=correlation_property.correlation_keys, ) db.session.add(message_correlation) db.session.add(message_instance) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py b/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py index 2e5b004a..a0d3bafc 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py @@ -6,17 +6,17 @@ from SpiffWorkflow.bpmn.parser.util import full_tag # type: ignore from SpiffWorkflow.bpmn.serializer.task_spec import EventConverter # type: ignore from SpiffWorkflow.bpmn.serializer.task_spec import StartEventConverter as DefaultStartEventConverter from SpiffWorkflow.bpmn.specs.defaults import StartEvent as DefaultStartEvent # type: ignore -from SpiffWorkflow.bpmn.specs.event_definitions import CycleTimerEventDefinition # type: ignore -from SpiffWorkflow.bpmn.specs.event_definitions import DurationTimerEventDefinition -from SpiffWorkflow.bpmn.specs.event_definitions import NoneEventDefinition -from SpiffWorkflow.bpmn.specs.event_definitions import TimeDateEventDefinition -from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition +from SpiffWorkflow.bpmn.specs.event_definitions.simple import NoneEventDefinition # type: ignore +from SpiffWorkflow.bpmn.specs.event_definitions.timer import CycleTimerEventDefinition # type: ignore +from SpiffWorkflow.bpmn.specs.event_definitions.timer import DurationTimerEventDefinition +from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimeDateEventDefinition +from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition from SpiffWorkflow.spiff.parser.event_parsers import SpiffStartEventParser # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore StartConfiguration = tuple[int, int, int] -# TODO: cylce timers and repeat counts? +# TODO: cycle timers and repeat counts? class StartEvent(DefaultStartEvent): # type: ignore diff --git a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn index 064365d8..a03a935e 100644 --- a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn +++ b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn @@ -1,5 +1,5 @@ - + Flow_089aeua @@ -51,9 +51,14 @@ set_in_test_process_to_call_script = 1 - + + + + the_var + + @@ -69,10 +74,10 @@ - - + + - + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 517de7ae..43c545da 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2412,8 +2412,6 @@ class TestProcessApi(BaseTest): data: dict = { "correlation_properties": [], "expression": None, - "external": True, - "internal": False, "payload": {"message": "message 1"}, "name": "Message 1", "typename": "MessageEventDefinition", diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py index e034e5ca..910cf577 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py @@ -2,6 +2,7 @@ from datetime import datetime from datetime import timezone from flask.app import Flask +from SpiffWorkflow.bpmn.event import PendingBpmnEvent # type: ignore from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService @@ -198,39 +199,47 @@ class TestProcessInstanceService(BaseTest): self._check_sample_file_data_model("File", 1, models[1]) def test_does_not_skip_events_it_does_not_know_about(self) -> None: + name = None + event_type = "Unknown" + value = "2023-04-27T20:15:10.626656+00:00" + pending_event = PendingBpmnEvent(name, event_type, value) assert not ( ProcessInstanceService.waiting_event_can_be_skipped( - {"event_type": "Unknown", "name": None, "value": "2023-04-27T20:15:10.626656+00:00"}, + pending_event, datetime.now(timezone.utc), ) ) def test_does_skip_duration_timer_events_for_the_future(self) -> None: + name = None + event_type = "DurationTimerEventDefinition" + value = "2023-04-27T20:15:10.626656+00:00" + pending_event = PendingBpmnEvent(name, event_type, value) assert ProcessInstanceService.waiting_event_can_be_skipped( - {"event_type": "DurationTimerEventDefinition", "name": None, "value": "2023-04-27T20:15:10.626656+00:00"}, + pending_event, datetime.fromisoformat("2023-04-26T20:15:10.626656+00:00"), ) def test_does_not_skip_duration_timer_events_for_the_past(self) -> None: + name = None + event_type = "DurationTimerEventDefinition" + value = "2023-04-27T20:15:10.626656+00:00" + pending_event = PendingBpmnEvent(name, event_type, value) assert not ( ProcessInstanceService.waiting_event_can_be_skipped( - { - "event_type": "DurationTimerEventDefinition", - "name": None, - "value": "2023-04-27T20:15:10.626656+00:00", - }, + pending_event, datetime.fromisoformat("2023-04-28T20:15:10.626656+00:00"), ) ) def test_does_not_skip_duration_timer_events_for_now(self) -> None: + name = None + event_type = "DurationTimerEventDefinition" + value = "2023-04-27T20:15:10.626656+00:00" + pending_event = PendingBpmnEvent(name, event_type, value) assert not ( ProcessInstanceService.waiting_event_can_be_skipped( - { - "event_type": "DurationTimerEventDefinition", - "name": None, - "value": "2023-04-27T20:15:10.626656+00:00", - }, + pending_event, datetime.fromisoformat("2023-04-27T20:15:10.626656+00:00"), ) ) diff --git a/spiffworkflow-frontend/package-lock.json b/spiffworkflow-frontend/package-lock.json index d6a4ac61..5787b210 100644 --- a/spiffworkflow-frontend/package-lock.json +++ b/spiffworkflow-frontend/package-lock.json @@ -38375,7 +38375,7 @@ }, "bpmn-js-spiffworkflow": { "version": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#9dcca6c80b8ab8ed0d79658456047b90e8483541", - "from": "bpmn-js-spiffworkflow@https://github.com/sartography/bpmn-js-spiffworkflow.git#main", + "from": "bpmn-js-spiffworkflow@github:sartography/bpmn-js-spiffworkflow#main", "requires": { "inherits": "^2.0.4", "inherits-browser": "^0.0.1", diff --git a/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx b/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx index 75e25374..764fb361 100644 --- a/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx +++ b/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx @@ -379,7 +379,9 @@ export default function ReactDiagramEditor({ return ( !taskSpecsThatCannotBeHighlighted.includes(taskBpmnId) && !taskBpmnId.match(/EndJoin/) && - !taskBpmnId.match(/BoundaryEventParent/) + !taskBpmnId.match(/BoundaryEventParent/) && + !taskBpmnId.match(/BoundaryEventJoin/) && + !taskBpmnId.match(/BoundaryEventSplit/) ); }