diff --git a/spiffworkflow-backend/migrations/versions/44a8f46cc508_.py b/spiffworkflow-backend/migrations/versions/0c7428378d6e_.py similarity index 95% rename from spiffworkflow-backend/migrations/versions/44a8f46cc508_.py rename to spiffworkflow-backend/migrations/versions/0c7428378d6e_.py index da70af01..99d8a2ee 100644 --- a/spiffworkflow-backend/migrations/versions/44a8f46cc508_.py +++ b/spiffworkflow-backend/migrations/versions/0c7428378d6e_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 44a8f46cc508 +Revision ID: 0c7428378d6e Revises: -Create Date: 2023-04-17 15:40:28.658588 +Create Date: 2023-04-20 14:05:44.779453 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '44a8f46cc508' +revision = '0c7428378d6e' down_revision = None branch_labels = None depends_on = None @@ -84,6 +84,15 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('uri') ) + op.create_table('process_caller_cache', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_identifier', sa.String(length=255), nullable=True), + sa.Column('calling_process_identifier', sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('process_caller_cache', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_process_caller_cache_process_identifier'), ['process_identifier'], unique=False) + op.create_table('spec_reference_cache', sa.Column('id', sa.Integer(), nullable=False), sa.Column('identifier', sa.String(length=255), nullable=True), @@ -463,6 +472,21 @@ def upgrade(): batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'), ['message_instance_id'], unique=False) batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_name'), ['name'], unique=False) + op.create_table('process_instance_error_detail', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_instance_event_id', sa.Integer(), nullable=False), + sa.Column('message', sa.String(length=1024), nullable=False), + sa.Column('stacktrace', sa.JSON(), nullable=False), + sa.Column('task_line_number', sa.Integer(), nullable=True), + sa.Column('task_offset', sa.Integer(), nullable=True), + sa.Column('task_line_contents', sa.String(length=255), nullable=True), + sa.Column('task_trace', sa.JSON(), nullable=True), + sa.ForeignKeyConstraint(['process_instance_event_id'], ['process_instance_event.id'], ), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id'), ['process_instance_event_id'], unique=False) + op.create_table('human_task_user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('human_task_id', sa.Integer(), nullable=False), @@ -486,6 +510,10 @@ def downgrade(): batch_op.drop_index(batch_op.f('ix_human_task_user_human_task_id')) op.drop_table('human_task_user') + with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id')) + + op.drop_table('process_instance_error_detail') with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op: batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_name')) batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id')) @@ -607,6 +635,10 @@ def downgrade(): batch_op.drop_index(batch_op.f('ix_spec_reference_cache_display_name')) op.drop_table('spec_reference_cache') + with op.batch_alter_table('process_caller_cache', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_process_caller_cache_process_identifier')) + + op.drop_table('process_caller_cache') op.drop_table('permission_target') with op.batch_alter_table('message_triggerable_process_model', schema=None) as batch_op: batch_op.drop_index(batch_op.f('ix_message_triggerable_process_model_process_model_identifier')) diff --git a/spiffworkflow-backend/migrations/versions/36241ec6747b_.py b/spiffworkflow-backend/migrations/versions/36241ec6747b_.py deleted file mode 100644 index dc24b702..00000000 --- a/spiffworkflow-backend/migrations/versions/36241ec6747b_.py +++ /dev/null @@ -1,39 +0,0 @@ -"""empty message - -Revision ID: 36241ec6747b -Revises: 44a8f46cc508 -Create Date: 2023-04-19 10:31:23.202482 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '36241ec6747b' -down_revision = '44a8f46cc508' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('process_caller_cache', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('process_identifier', sa.String(length=255), nullable=True), - sa.Column('calling_process_identifier', sa.String(length=255), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - with op.batch_alter_table('process_caller_cache', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_process_caller_cache_process_identifier'), ['process_identifier'], unique=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('process_caller_cache', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_process_caller_cache_process_identifier')) - - op.drop_table('process_caller_cache') - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index 8904e46b..f600d2d3 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry and should not be changed by hand. +# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. [[package]] name = "alabaster" @@ -2796,6 +2796,8 @@ files = [ {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:045e0626baf1c52e5527bd5db361bc83180faaba2ff586e763d3d5982a876a9e"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_12_6_arm64.whl", hash = "sha256:721bc4ba4525f53f6a611ec0967bdcee61b31df5a56801281027a3a6d1c2daf5"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:41d0f1fa4c6830176eef5b276af04c89320ea616655d01327d5ce65e50575c94"}, + {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win32.whl", hash = "sha256:f6d3d39611ac2e4f62c3128a9eed45f19a6608670c5a2f4f07f24e8de3441d38"}, + {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win_amd64.whl", hash = "sha256:da538167284de58a52109a9b89b8f6a53ff8437dd6dc26d33b57bf6699153122"}, {file = "ruamel.yaml.clib-0.2.7-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:4b3a93bb9bc662fc1f99c5c3ea8e623d8b23ad22f861eb6fce9377ac07ad6072"}, {file = "ruamel.yaml.clib-0.2.7-cp36-cp36m-macosx_12_0_arm64.whl", hash = "sha256:a234a20ae07e8469da311e182e70ef6b199d0fbeb6c6cc2901204dd87fb867e8"}, {file = "ruamel.yaml.clib-0.2.7-cp36-cp36m-manylinux2014_aarch64.whl", hash = "sha256:15910ef4f3e537eea7fe45f8a5d19997479940d9196f357152a09031c5be59f3"}, @@ -3299,7 +3301,7 @@ lxml = "*" type = "git" url = "https://github.com/sartography/SpiffWorkflow" reference = "main" -resolved_reference = "eceef15a73c7d5f7251be1a3933498eb97dfb5fe" +resolved_reference = "73886584b17c7d11a9713d0c4526ed41e411fc45" [[package]] name = "sqlalchemy" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml index 14193cac..e4ef39a0 100755 --- a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml +++ b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml @@ -2035,6 +2035,39 @@ paths: schema: $ref: "#/components/schemas/ProcessInstanceLog" + /event-error-details/{modified_process_model_identifier}/{process_instance_id}/{process_instance_event_id}: + parameters: + - name: process_instance_id + in: path + required: true + description: the id of the process instance + schema: + type: integer + - name: modified_process_model_identifier + in: path + required: true + description: The process_model_id, modified to replace slashes (/) + schema: + type: string + - name: process_instance_event_id + in: path + required: true + description: the id of the process instance event + schema: + type: integer + get: + tags: + - Process Instance Events + operationId: spiffworkflow_backend.routes.process_instance_events_controller.error_details + summary: returns the error details for a given process instance event. + responses: + "200": + description: list of types + content: + application/json: + schema: + $ref: "#/components/schemas/ProcessInstanceLog" + /secrets: parameters: - name: page diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index 5b35df3e..009a7486 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -205,7 +205,6 @@ class ProcessInstanceApi: next_task: Task | None, process_model_identifier: str, process_model_display_name: str, - completed_tasks: int, updated_at_in_seconds: int, ) -> None: """__init__.""" @@ -214,7 +213,6 @@ class ProcessInstanceApi: self.next_task = next_task # The next task that requires user input. self.process_model_identifier = process_model_identifier self.process_model_display_name = process_model_display_name - self.completed_tasks = completed_tasks self.updated_at_in_seconds = updated_at_in_seconds @@ -231,7 +229,6 @@ class ProcessInstanceApiSchema(Schema): "next_task", "process_model_identifier", "process_model_display_name", - "completed_tasks", "updated_at_in_seconds", ] unknown = INCLUDE @@ -248,7 +245,6 @@ class ProcessInstanceApiSchema(Schema): "next_task", "process_model_identifier", "process_model_display_name", - "completed_tasks", "updated_at_in_seconds", ] filtered_fields = {key: data[key] for key in keys} diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_error_detail.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_error_detail.py new file mode 100644 index 00000000..804388bb --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_error_detail.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from typing import Optional + +from sqlalchemy import ForeignKey +from sqlalchemy.orm import relationship + +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel + + +@dataclass +class ProcessInstanceErrorDetailModel(SpiffworkflowBaseDBModel): + __tablename__ = "process_instance_error_detail" + id: int = db.Column(db.Integer, primary_key=True) + + process_instance_event_id: int = db.Column(ForeignKey("process_instance_event.id"), nullable=False, index=True) + process_instance_event = relationship("ProcessInstanceEventModel") # type: ignore + + message: str = db.Column(db.String(1024), nullable=False) + + # this should be 65k in mysql + stacktrace: Optional[list] = db.Column(db.JSON, nullable=False) + + task_line_number: Optional[int] = db.Column(db.Integer) + task_offset: Optional[int] = db.Column(db.Integer) + task_line_contents: Optional[str] = db.Column(db.String(255)) + task_trace: Optional[list] = db.Column(db.JSON) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py index fe920b57..3a883e5c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any from sqlalchemy import ForeignKey +from sqlalchemy.orm import relationship from sqlalchemy.orm import validates from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum @@ -13,6 +14,7 @@ from spiffworkflow_backend.models.user import UserModel # event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same. class ProcessInstanceEventType(SpiffEnum): + process_instance_error = "process_instance_error" process_instance_resumed = "process_instance_resumed" process_instance_rewound_to_task = "process_instance_rewound_to_task" process_instance_suspended = "process_instance_suspended" @@ -37,6 +39,10 @@ class ProcessInstanceEventModel(SpiffworkflowBaseDBModel): user_id = db.Column(ForeignKey(UserModel.id), nullable=True, index=True) # type: ignore + error_details = relationship( + "ProcessInstanceErrorDetailModel", back_populates="process_instance_event", cascade="delete" + ) # type: ignore + @validates("event_type") def validate_event_type(self, key: str, value: Any) -> Any: return self.validate_enum_field(key, value, ProcessInstanceEventType) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py index 4e00a6ca..ef043a7e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py @@ -1,4 +1,3 @@ -"""Process_instance_metadata.""" from dataclasses import dataclass from sqlalchemy import ForeignKey @@ -10,8 +9,6 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel @dataclass class ProcessInstanceMetadataModel(SpiffworkflowBaseDBModel): - """ProcessInstanceMetadataModel.""" - __tablename__ = "process_instance_metadata" __table_args__ = (db.UniqueConstraint("process_instance_id", "key", name="process_instance_metadata_unique"),) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instance_events_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instance_events_controller.py index 5711f6bc..5831632b 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instance_events_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instance_events_controller.py @@ -5,6 +5,7 @@ from flask import jsonify from flask import make_response from sqlalchemy import and_ +from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel @@ -91,3 +92,20 @@ def types() -> flask.wrappers.Response: task_types = [t.typename for t in query] event_types = ProcessInstanceEventType.list() return make_response(jsonify({"task_types": task_types, "event_types": event_types}), 200) + + +def error_details( + modified_process_model_identifier: str, + process_instance_id: int, + process_instance_event_id: int, +) -> flask.wrappers.Response: + process_instance_event = ProcessInstanceEventModel.query.filter_by(id=process_instance_event_id).first() + if process_instance_event is None: + raise ( + ApiError( + error_code="process_instance_event_cannot_be_found", + message=f"Process instance event cannot be found: {process_instance_event_id}", + status_code=400, + ) + ) + return make_response(jsonify(process_instance_event.error_details[0]), 200) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index 6813f415..b8217fa3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -112,7 +112,6 @@ def process_instance_create( def process_instance_run( modified_process_model_identifier: str, process_instance_id: int, - do_engine_steps: bool = True, ) -> flask.wrappers.Response: """Process_instance_run.""" process_instance = _find_process_instance_by_id_or_raise(process_instance_id) @@ -123,22 +122,21 @@ def process_instance_run( status_code=400, ) - processor = ProcessInstanceProcessor(process_instance) - - if do_engine_steps: - try: - processor.do_engine_steps(save=True) - except ( - ApiError, - ProcessInstanceIsNotEnqueuedError, - ProcessInstanceIsAlreadyLockedError, - ) as e: - ErrorHandlingService().handle_error(processor, e) - raise e - except Exception as e: - ErrorHandlingService().handle_error(processor, e) - # FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes. - # we need to recurse through all last tasks if the last task is a call activity or subprocess. + processor = None + try: + processor = ProcessInstanceService.run_process_intance_with_processor(process_instance) + except ( + ApiError, + ProcessInstanceIsNotEnqueuedError, + ProcessInstanceIsAlreadyLockedError, + ) as e: + ErrorHandlingService.handle_error(process_instance, e) + raise e + except Exception as e: + ErrorHandlingService.handle_error(process_instance, e) + # FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes. + # we need to recurse through all last tasks if the last task is a call activity or subprocess. + if processor is not None: task = processor.bpmn_process_instance.last_task raise ApiError.from_task( error_code="unknown_exception", @@ -146,15 +144,22 @@ def process_instance_run( status_code=400, task=task, ) from e + raise e - if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: - MessageService.correlate_all_message_instances() + if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: + MessageService.correlate_all_message_instances() - process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor) - process_instance_data = processor.get_data() - process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api) - process_instance_metadata["data"] = process_instance_data - return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json") + # for mypy + if processor is not None: + process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor) + process_instance_data = processor.get_data() + process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api) + process_instance_metadata["data"] = process_instance_data + return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json") + + # FIXME: this should never happen currently but it'd be ideal to always do this + # currently though it does not return next task so it cannnot be used to take the user to the next human task + return make_response(jsonify(process_instance), 200) def process_instance_terminate( @@ -172,7 +177,7 @@ def process_instance_terminate( ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError, ) as e: - ErrorHandlingService().handle_error(processor, e) + ErrorHandlingService.handle_error(process_instance, e) raise e return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -193,7 +198,7 @@ def process_instance_suspend( ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError, ) as e: - ErrorHandlingService().handle_error(processor, e) + ErrorHandlingService.handle_error(process_instance, e) raise e return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -214,7 +219,7 @@ def process_instance_resume( ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError, ) as e: - ErrorHandlingService().handle_error(processor, e) + ErrorHandlingService.handle_error(process_instance, e) raise e return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 9baffd25..2a4ffc1d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -215,7 +215,7 @@ def task_data_update( ) if json_data_dict is not None: TaskService.insert_or_update_json_data_records({json_data_dict["hash"]: json_data_dict}) - ProcessInstanceProcessor.add_event_to_process_instance( + TaskService.add_event_to_process_instance( process_instance, ProcessInstanceEventType.task_data_edited.value, task_guid=task_guid ) try: @@ -428,7 +428,7 @@ def _task_submit_shared( if save_as_draft: task_model = _get_task_model_from_guid_or_raise(task_guid, process_instance_id) - ProcessInstanceService.update_form_task_data(processor, spiff_task, body, g.user) + ProcessInstanceService.update_form_task_data(process_instance, spiff_task, body, g.user) json_data_dict = TaskService.update_task_data_on_task_model_and_return_dict_if_updated( task_model, spiff_task.data, "json_data_hash" ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py index 4407c6db..6d095104 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py @@ -1,66 +1,58 @@ -"""Error_handling_service.""" -from typing import Union - from flask import current_app from flask import g -from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.services.message_service import MessageService -from spiffworkflow_backend.services.process_instance_processor import ( - ProcessInstanceProcessor, -) from spiffworkflow_backend.services.process_model_service import ProcessModelService class ErrorHandlingService: - """ErrorHandlingService.""" - MESSAGE_NAME = "SystemErrorMessage" - @staticmethod - def set_instance_status(instance_id: int, status: str) -> None: - """Set_instance_status.""" - instance = db.session.query(ProcessInstanceModel).filter(ProcessInstanceModel.id == instance_id).first() - if instance: - instance.status = status - db.session.commit() - - def handle_error(self, _processor: ProcessInstanceProcessor, _error: Union[ApiError, Exception]) -> None: + @classmethod + def handle_error(cls, process_instance: ProcessInstanceModel, error: Exception) -> None: """On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception.""" - process_model = ProcessModelService.get_process_model(_processor.process_model_identifier) - # First, suspend or fault the instance - if process_model.fault_or_suspend_on_exception == "suspend": - self.set_instance_status( - _processor.process_instance_model.id, - ProcessInstanceStatus.suspended.value, - ) - else: - # fault is the default - self.set_instance_status( - _processor.process_instance_model.id, - ProcessInstanceStatus.error.value, - ) + process_model = ProcessModelService.get_process_model(process_instance.process_model_identifier) + cls._update_process_instance_in_database(process_instance, process_model.fault_or_suspend_on_exception) # Second, send a bpmn message out, but only if an exception notification address is provided # This will create a new Send Message with correlation keys on the recipients and the message # body. if len(process_model.exception_notification_addresses) > 0: try: - self.handle_system_notification(_error, process_model, _processor) + cls._handle_system_notification(error, process_model, process_instance) except Exception as e: # hmm... what to do if a notification method fails. Probably log, at least current_app.logger.error(e) + @classmethod + def _update_process_instance_in_database( + cls, process_instance: ProcessInstanceModel, fault_or_suspend_on_exception: str + ) -> None: + # First, suspend or fault the instance + if fault_or_suspend_on_exception == "suspend": + cls._set_instance_status( + process_instance, + ProcessInstanceStatus.suspended.value, + ) + else: + # fault is the default + cls._set_instance_status( + process_instance, + ProcessInstanceStatus.error.value, + ) + + db.session.commit() + @staticmethod - def handle_system_notification( - error: Union[ApiError, Exception], + def _handle_system_notification( + error: Exception, process_model: ProcessModelInfo, - _processor: ProcessInstanceProcessor, + process_instance: ProcessInstanceModel, ) -> None: """Send a BPMN Message - which may kick off a waiting process.""" message_text = ( @@ -74,7 +66,7 @@ class ErrorHandlingService: if "user" in g: user_id = g.user.id else: - user_id = _processor.process_instance_model.process_initiator_id + user_id = process_instance.process_initiator_id message_instance = MessageInstanceModel( message_type="send", @@ -85,3 +77,8 @@ class ErrorHandlingService: db.session.add(message_instance) db.session.commit() MessageService.correlate_send_message(message_instance) + + @staticmethod + def _set_instance_status(process_instance: ProcessInstanceModel, status: str) -> None: + process_instance.status = status + db.session.add(process_instance) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 8cd8dfe4..5e63fd09 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,4 +1,6 @@ """Process_instance_processor.""" +# TODO: clean up this service for a clear distinction between it and the process_instance_service +# where this points to the pi service import _strptime # type: ignore import copy import decimal @@ -24,7 +26,6 @@ from uuid import UUID import dateparser import pytz from flask import current_app -from flask import g from lxml import etree # type: ignore from lxml.etree import XMLSyntaxError # type: ignore from RestrictedPython import safe_globals # type: ignore @@ -75,7 +76,6 @@ from spiffworkflow_backend.models.message_instance_correlation import ( ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus -from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.process_instance_metadata import ( ProcessInstanceMetadataModel, @@ -102,9 +102,8 @@ from spiffworkflow_backend.services.spec_file_service import SpecFileService from spiffworkflow_backend.services.task_service import JsonDataDict from spiffworkflow_backend.services.task_service import TaskService from spiffworkflow_backend.services.user_service import UserService -from spiffworkflow_backend.services.workflow_execution_service import ( - execution_strategy_named, -) +from spiffworkflow_backend.services.workflow_execution_service import execution_strategy_named +from spiffworkflow_backend.services.workflow_execution_service import ExecutionStrategyNotConfiguredError from spiffworkflow_backend.services.workflow_execution_service import ( TaskModelSavingDelegate, ) @@ -157,9 +156,10 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty script: str, context: Dict[str, Any], external_methods: Optional[Dict[str, Any]] = None, - ) -> None: + ) -> bool: super().execute(script, context, external_methods) self._last_result = context + return True def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: return {} @@ -212,7 +212,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment) script: str, context: Dict[str, Any], external_methods: Optional[Dict[str, Any]] = None, - ) -> None: + ) -> bool: # TODO: once integrated look at the tests that fail without Box # context is task.data Box.convert_to_box(context) @@ -221,6 +221,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment) self.state.update(context) try: exec(script, self.state) # noqa + return True finally: # since the task data is not directly mutated when the script executes, need to determine which keys # have been deleted from the environment and remove them from task data if present. @@ -313,13 +314,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore # This will overwrite the standard builtins default_globals.update(safe_globals) default_globals["__builtins__"]["__import__"] = _import - environment = CustomScriptEngineEnvironment(default_globals) - - # right now spiff is executing script tasks on ready so doing this - # so we know when something fails and we can save it to our database. - self.failing_spiff_task: Optional[SpiffTask] = None - super().__init__(environment=environment) def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]: @@ -346,7 +341,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore expression: str, external_methods: Optional[dict[str, Any]] = None, ) -> Any: - """Evaluate.""" return self._evaluate(expression, task.data, task, external_methods) def _evaluate( @@ -356,7 +350,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore task: Optional[SpiffTask] = None, external_methods: Optional[Dict[str, Any]] = None, ) -> Any: - """_evaluate.""" methods = self.__get_augment_methods(task) if external_methods: methods.update(external_methods) @@ -376,17 +369,15 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore exception=exception, ) from exception - def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None: - """Execute.""" + def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> bool: try: # reset failing task just in case - self.failing_spiff_task = None methods = self.__get_augment_methods(task) if external_methods: methods.update(external_methods) super().execute(task, script, methods) + return True except WorkflowException as e: - self.failing_spiff_task = task raise e except Exception as e: raise self.create_task_exec_exception(task, script, e) from e @@ -397,7 +388,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore operation_params: Dict[str, Any], task_data: Dict[str, Any], ) -> Any: - """CallService.""" return ServiceTaskDelegate.call_connector(operation_name, operation_params, task_data) @@ -1119,14 +1109,10 @@ class ProcessInstanceProcessor: """Saves the current state of this processor to the database.""" self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION - complete_states = [TaskState.CANCELLED, TaskState.COMPLETED] - user_tasks = list(self.get_all_user_tasks()) self.process_instance_model.status = self.get_status().value current_app.logger.debug( f"the_status: {self.process_instance_model.status} for instance {self.process_instance_model.id}" ) - self.process_instance_model.total_tasks = len(user_tasks) - self.process_instance_model.completed_tasks = sum(1 for t in user_tasks if t.state in complete_states) if self.process_instance_model.start_in_seconds is None: self.process_instance_model.start_in_seconds = round(time.time()) @@ -1318,7 +1304,7 @@ class ProcessInstanceProcessor: db.session.bulk_save_objects(new_task_models.values()) TaskService.insert_or_update_json_data_records(new_json_data_dicts) - self.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id) + TaskService.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id) self.save() # Saving the workflow seems to reset the status self.suspend() @@ -1331,7 +1317,7 @@ class ProcessInstanceProcessor: def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None: """Reset a process to an earlier state.""" # raise Exception("This feature to reset a process instance to a given task is currently unavaiable") - cls.add_event_to_process_instance( + TaskService.add_event_to_process_instance( process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid ) @@ -1688,6 +1674,10 @@ class ProcessInstanceProcessor: if execution_strategy_name is None: execution_strategy_name = current_app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"] + if execution_strategy_name is None: + raise ExecutionStrategyNotConfiguredError( + "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set" + ) execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate) execution_service = WorkflowExecutionService( @@ -1697,16 +1687,7 @@ class ProcessInstanceProcessor: self._script_engine.environment.finalize_result, self.save, ) - try: - execution_service.run(exit_at, save) - finally: - # clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the - # script engine on a class variable. - if ( - hasattr(self._script_engine, "failing_spiff_task") - and self._script_engine.failing_spiff_task is not None - ): - self._script_engine.failing_spiff_task = None + execution_service.run(exit_at, save) @classmethod def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]: @@ -1861,7 +1842,7 @@ class ProcessInstanceProcessor: TaskService.update_json_data_dicts_using_list(json_data_dict_list, json_data_dict_mapping) TaskService.insert_or_update_json_data_records(json_data_dict_mapping) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.task_completed.value, task_guid=task_model.guid, @@ -1927,7 +1908,6 @@ class ProcessInstanceProcessor: return [t for t in all_tasks if not self.bpmn_process_instance._is_engine_task(t.task_spec)] def get_all_completed_tasks(self) -> list[SpiffTask]: - """Get_all_completed_tasks.""" all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) return [ t @@ -1960,49 +1940,13 @@ class ProcessInstanceProcessor: return task return None - def get_nav_item(self, task: SpiffTask) -> Any: - """Get_nav_item.""" - for nav_item in self.bpmn_process_instance.get_nav_list(): - if nav_item["task_id"] == task.id: - return nav_item - return None - - def find_spec_and_field(self, spec_name: str, field_id: Union[str, int]) -> Any: - """Tracks down a form field by name in the process_instance spec(s), Returns a tuple of the task, and form.""" - process_instances = [self.bpmn_process_instance] - for task in self.bpmn_process_instance.get_ready_user_tasks(): - if task.process_instance not in process_instances: - process_instances.append(task.process_instance) - for process_instance in process_instances: - for spec in process_instance.spec.task_specs.values(): - if spec.name == spec_name: - if not hasattr(spec, "form"): - raise ApiError( - "invalid_spec", - "The spec name you provided does not contain a form.", - ) - - for field in spec.form.fields: - if field.id == field_id: - return spec, field - - raise ApiError( - "invalid_field", - f"The task '{spec_name}' has no field named '{field_id}'", - ) - - raise ApiError( - "invalid_spec", - f"Unable to find a task in the process_instance called '{spec_name}'", - ) - def terminate(self) -> None: """Terminate.""" self.bpmn_process_instance.cancel() self.save() self.process_instance_model.status = "terminated" db.session.add(self.process_instance_model) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.process_instance_terminated.value ) db.session.commit() @@ -2011,7 +1955,7 @@ class ProcessInstanceProcessor: """Suspend.""" self.process_instance_model.status = ProcessInstanceStatus.suspended.value db.session.add(self.process_instance_model) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.process_instance_suspended.value ) db.session.commit() @@ -2020,24 +1964,7 @@ class ProcessInstanceProcessor: """Resume.""" self.process_instance_model.status = ProcessInstanceStatus.waiting.value db.session.add(self.process_instance_model) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value ) db.session.commit() - - @classmethod - def add_event_to_process_instance( - cls, - process_instance: ProcessInstanceModel, - event_type: str, - task_guid: Optional[str] = None, - user_id: Optional[int] = None, - ) -> None: - if user_id is None and hasattr(g, "user") and g.user: - user_id = g.user.id - process_instance_event = ProcessInstanceEventModel( - process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id - ) - if task_guid: - process_instance_event.task_guid = task_guid - db.session.add(process_instance_event) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index dd733816..31e7d725 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -7,12 +7,15 @@ from typing import Optional from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.process_instance_queue import ( ProcessInstanceQueueModel, ) from spiffworkflow_backend.services.process_instance_lock_service import ( ProcessInstanceLockService, ) +from spiffworkflow_backend.services.task_service import TaskService +from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError class ProcessInstanceIsNotEnqueuedError(Exception): @@ -24,8 +27,6 @@ class ProcessInstanceIsAlreadyLockedError(Exception): class ProcessInstanceQueueService: - """TODO: comment.""" - @classmethod def _configure_and_save_queue_entry( cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel @@ -99,6 +100,12 @@ class ProcessInstanceQueueService: except Exception as ex: process_instance.status = ProcessInstanceStatus.error.value db.session.add(process_instance) + # these events are handled in the WorkflowExecutionService. + # that is, we don't need to add error_detail records here, etc. + if not isinstance(ex, WorkflowExecutionServiceError): + TaskService.add_event_to_process_instance( + process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex + ) db.session.commit() raise ex finally: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 3a0307f1..3a6111d9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -113,20 +113,9 @@ class ProcessInstanceService: .all() ) for process_instance in records: + current_app.logger.info(f"Processing process_instance {process_instance.id}") try: - current_app.logger.info(f"Processing process_instance {process_instance.id}") - with ProcessInstanceQueueService.dequeued(process_instance): - processor = ProcessInstanceProcessor(process_instance) - if cls.can_optimistically_skip(processor, status_value): - current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}") - continue - - db.session.refresh(process_instance) - if process_instance.status == status_value: - execution_strategy_name = current_app.config[ - "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND" - ] - processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + cls.run_process_intance_with_processor(process_instance, status_value=status_value) except ProcessInstanceIsAlreadyLockedError: continue except Exception as e: @@ -137,6 +126,26 @@ class ProcessInstanceService: ) current_app.logger.error(error_message) + @classmethod + def run_process_intance_with_processor( + cls, process_instance: ProcessInstanceModel, status_value: Optional[str] = None + ) -> Optional[ProcessInstanceProcessor]: + processor = None + with ProcessInstanceQueueService.dequeued(process_instance): + processor = ProcessInstanceProcessor(process_instance) + if status_value and cls.can_optimistically_skip(processor, status_value): + current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}") + return None + + db.session.refresh(process_instance) + if status_value is None or process_instance.status == status_value: + execution_strategy_name = current_app.config[ + "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND" + ] + processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + + return processor + @staticmethod def processor_to_process_instance_api( processor: ProcessInstanceProcessor, next_task: None = None @@ -155,7 +164,6 @@ class ProcessInstanceService: next_task=None, process_model_identifier=processor.process_model_identifier, process_model_display_name=processor.process_model_display_name, - completed_tasks=processor.process_instance_model.completed_tasks, updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds, ) @@ -322,19 +330,20 @@ class ProcessInstanceService: cls.replace_file_data_with_digest_references(data, models) - @staticmethod + @classmethod def update_form_task_data( - processor: ProcessInstanceProcessor, + cls, + process_instance: ProcessInstanceModel, spiff_task: SpiffTask, data: dict[str, Any], user: UserModel, ) -> None: - AuthorizationService.assert_user_can_complete_spiff_task(processor.process_instance_model.id, spiff_task, user) - ProcessInstanceService.save_file_data_and_replace_with_digest_references( + AuthorizationService.assert_user_can_complete_spiff_task(process_instance.id, spiff_task, user) + cls.save_file_data_and_replace_with_digest_references( data, - processor.process_instance_model.id, + process_instance.id, ) - dot_dct = ProcessInstanceService.create_dot_dict(data) + dot_dct = cls.create_dot_dict(data) spiff_task.update_data(dot_dct) @staticmethod @@ -350,7 +359,7 @@ class ProcessInstanceService: Abstracted here because we need to do it multiple times when completing all tasks in a multi-instance task. """ - ProcessInstanceService.update_form_task_data(processor, spiff_task, data, user) + ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user) # ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store. processor.complete_task(spiff_task, human_task, user=user) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 76880da2..27d56d35 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -1,6 +1,7 @@ import copy import json import time +import traceback from hashlib import sha256 from typing import Optional from typing import Tuple @@ -8,19 +9,23 @@ from typing import TypedDict from uuid import UUID from flask import current_app +from flask import g from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer +from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskStateNames from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.postgresql import insert as postgres_insert +from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process import BpmnProcessNotFoundError from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401 from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.task import TaskModel # noqa: F401 @@ -112,7 +117,6 @@ class TaskService: def update_task_model_with_spiff_task( self, spiff_task: SpiffTask, - task_failed: bool = False, start_and_end_times: Optional[StartAndEndTimes] = None, ) -> TaskModel: new_bpmn_process = None @@ -153,19 +157,16 @@ class TaskService: task_model.start_in_seconds = start_and_end_times["start_in_seconds"] task_model.end_in_seconds = start_and_end_times["end_in_seconds"] - if task_model.state == "COMPLETED" or task_failed: + # let failed tasks raise and we will log the event then + if task_model.state == "COMPLETED": event_type = ProcessInstanceEventType.task_completed.value - if task_failed: - event_type = ProcessInstanceEventType.task_failed.value - - # FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete - # which script tasks execute when READY. timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time() - process_instance_event = ProcessInstanceEventModel( + process_instance_event, _process_instance_error_detail = TaskService.add_event_to_process_instance( + self.process_instance, + event_type, task_guid=task_model.guid, - process_instance_id=self.process_instance.id, - event_type=event_type, timestamp=timestamp, + add_to_db_session=False, ) self.process_instance_events[task_model.guid] = process_instance_event @@ -592,3 +593,61 @@ class TaskService: for json_data_dict in json_data_dict_list: if json_data_dict is not None: json_data_dicts[json_data_dict["hash"]] = json_data_dict + + # TODO: move to process_instance_service once we clean it and the processor up + @classmethod + def add_event_to_process_instance( + cls, + process_instance: ProcessInstanceModel, + event_type: str, + task_guid: Optional[str] = None, + user_id: Optional[int] = None, + exception: Optional[Exception] = None, + timestamp: Optional[float] = None, + add_to_db_session: Optional[bool] = True, + ) -> Tuple[ProcessInstanceEventModel, Optional[ProcessInstanceErrorDetailModel]]: + if user_id is None and hasattr(g, "user") and g.user: + user_id = g.user.id + if timestamp is None: + timestamp = time.time() + + process_instance_event = ProcessInstanceEventModel( + process_instance_id=process_instance.id, event_type=event_type, timestamp=timestamp, user_id=user_id + ) + if task_guid: + process_instance_event.task_guid = task_guid + + if add_to_db_session: + db.session.add(process_instance_event) + + process_instance_error_detail = None + if exception is not None: + # truncate to avoid database errors on large values. We observed that text in mysql is 65K. + stacktrace = traceback.format_exc().split("\n") + message = str(exception)[0:1023] + + task_line_number = None + task_line_contents = None + task_trace = None + task_offset = None + if isinstance(exception, WorkflowTaskException) or ( + isinstance(exception, ApiError) and exception.error_code == "task_error" + ): + task_line_number = exception.line_number + task_line_contents = exception.error_line + task_trace = exception.task_trace + task_offset = exception.offset + + process_instance_error_detail = ProcessInstanceErrorDetailModel( + process_instance_event=process_instance_event, + message=message, + stacktrace=stacktrace, + task_line_number=task_line_number, + task_line_contents=task_line_contents, + task_trace=task_trace, + task_offset=task_offset, + ) + + if add_to_db_session: + db.session.add(process_instance_error_detail) + return (process_instance_event, process_instance_error_detail) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 5764cc89..c8e6dd00 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,13 +1,15 @@ +from __future__ import annotations + import copy import time +from abc import abstractmethod from typing import Callable -from typing import Optional -from typing import Set from uuid import UUID from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore +from SpiffWorkflow.exceptions import WorkflowTaskException from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState @@ -18,7 +20,7 @@ from spiffworkflow_backend.models.message_instance_correlation import ( MessageInstanceCorrelationRuleModel, ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401 +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.process_instance_lock_service import ( ProcessInstanceLockService, @@ -27,21 +29,67 @@ from spiffworkflow_backend.services.task_service import StartAndEndTimes from spiffworkflow_backend.services.task_service import TaskService +class WorkflowExecutionServiceError(WorkflowTaskException): # type: ignore + @classmethod + def from_workflow_task_exception( + cls, + workflow_task_exception: WorkflowTaskException, + ) -> WorkflowExecutionServiceError: + return cls( + error_msg=str(workflow_task_exception), + task=workflow_task_exception.task, + exception=workflow_task_exception, + line_number=workflow_task_exception.line_number, + offset=workflow_task_exception.offset, + error_line=workflow_task_exception.error_line, + ) + + +class ExecutionStrategyNotConfiguredError(Exception): + pass + + class EngineStepDelegate: """Interface of sorts for a concrete engine step delegate.""" + @abstractmethod def will_complete_task(self, spiff_task: SpiffTask) -> None: pass + @abstractmethod def did_complete_task(self, spiff_task: SpiffTask) -> None: pass + @abstractmethod def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None: pass + @abstractmethod def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: pass + @abstractmethod + def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: + pass + + +class ExecutionStrategy: + """Interface of sorts for a concrete execution strategy.""" + + def __init__(self, delegate: EngineStepDelegate): + """__init__.""" + self.delegate = delegate + + @abstractmethod + def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: + pass + + def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.delegate.on_exception(bpmn_process_instance) + + def save(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.delegate.save(bpmn_process_instance) + class TaskModelSavingDelegate(EngineStepDelegate): """Engine step delegate that takes care of saving a task model to the database. @@ -54,17 +102,17 @@ class TaskModelSavingDelegate(EngineStepDelegate): serializer: BpmnWorkflowSerializer, process_instance: ProcessInstanceModel, bpmn_definition_to_task_definitions_mappings: dict, - secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, + secondary_engine_step_delegate: EngineStepDelegate | None = None, ) -> None: self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings self.serializer = serializer - self.current_task_start_in_seconds: Optional[float] = None + self.current_task_start_in_seconds: float | None = None - self.last_completed_spiff_task: Optional[SpiffTask] = None - self.spiff_tasks_to_process: Set[UUID] = set() + self.last_completed_spiff_task: SpiffTask | None = None + self.spiff_tasks_to_process: set[UUID] = set() self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {} self.task_service = TaskService( @@ -103,29 +151,12 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.secondary_engine_step_delegate.did_complete_task(spiff_task) def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None: - script_engine = bpmn_process_instance.script_engine - if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None: - failing_spiff_task = script_engine.failing_spiff_task - self.task_service.update_task_model_with_spiff_task(failing_spiff_task, task_failed=True) - self.task_service.process_spiff_task_parent_subprocess_tasks(failing_spiff_task) - self.task_service.process_spiff_task_children(failing_spiff_task) - self.task_service.save_objects_to_database() if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False) db.session.commit() - def _add_children(self, spiff_task: SpiffTask) -> None: - for child_spiff_task in spiff_task.children: - self.spiff_tasks_to_process.add(child_spiff_task.id) - self._add_children(child_spiff_task) - - def _add_parents(self, spiff_task: SpiffTask) -> None: - if spiff_task.parent and spiff_task.parent.task_spec.name != "Root": - self.spiff_tasks_to_process.add(spiff_task.parent.id) - self._add_parents(spiff_task.parent) - def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: if self._should_update_task_model(): # NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace @@ -138,6 +169,8 @@ class TaskModelSavingDelegate(EngineStepDelegate): | TaskState.MAYBE | TaskState.LIKELY | TaskState.FUTURE + | TaskState.STARTED + | TaskState.ERROR ): # these will be removed from the parent and then ignored if waiting_spiff_task._has_state(TaskState.PREDICTED_MASK): @@ -184,6 +217,19 @@ class TaskModelSavingDelegate(EngineStepDelegate): # self.task_service.process_spiff_task_children(self.last_completed_spiff_task) # self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task) + def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.after_engine_steps(bpmn_process_instance) + + def _add_children(self, spiff_task: SpiffTask) -> None: + for child_spiff_task in spiff_task.children: + self.spiff_tasks_to_process.add(child_spiff_task.id) + self._add_children(child_spiff_task) + + def _add_parents(self, spiff_task: SpiffTask) -> None: + if spiff_task.parent and spiff_task.parent.task_spec.name != "Root": + self.spiff_tasks_to_process.add(spiff_task.parent.id) + self._add_parents(spiff_task.parent) + def _should_update_task_model(self) -> bool: """We need to figure out if we have previously save task info on this process intance. @@ -193,20 +239,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): return True -class ExecutionStrategy: - """Interface of sorts for a concrete execution strategy.""" - - def __init__(self, delegate: EngineStepDelegate): - """__init__.""" - self.delegate = delegate - - def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - pass - - def save(self, bpmn_process_instance: BpmnWorkflow) -> None: - self.delegate.save(bpmn_process_instance) - - class GreedyExecutionStrategy(ExecutionStrategy): """The common execution strategy. This will greedily run all engine steps without stopping.""" @@ -326,7 +358,17 @@ class WorkflowExecutionService: self.process_bpmn_messages() self.queue_waiting_receive_messages() + except WorkflowTaskException as wte: + TaskService.add_event_to_process_instance( + self.process_instance_model, + ProcessInstanceEventType.task_failed.value, + exception=wte, + task_guid=str(wte.task.id), + ) + self.execution_strategy.on_exception(self.bpmn_process_instance) + raise WorkflowExecutionServiceError.from_workflow_task_exception(wte) from wte except SpiffWorkflowException as swe: + self.execution_strategy.on_exception(self.bpmn_process_instance) raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe finally: diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 30c0bec0..0c3488ea 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2061,7 +2061,7 @@ class TestProcessApi(BaseTest): assert response.status_code == 400 api_error = json.loads(response.get_data(as_text=True)) - assert api_error["error_code"] == "task_error" + assert api_error["error_code"] == "unexpected_workflow_exception" assert 'TypeError:can only concatenate str (not "int") to str' in api_error["message"] process = db.session.query(ProcessInstanceModel).filter(ProcessInstanceModel.id == process_instance_id).first() @@ -2141,7 +2141,7 @@ class TestProcessApi(BaseTest): processor = ProcessInstanceProcessor(process_instance) spiff_task = processor.get_task_by_bpmn_identifier("script_task_two", processor.bpmn_process_instance) assert spiff_task is not None - assert spiff_task.state == TaskState.WAITING + assert spiff_task.state == TaskState.ERROR assert spiff_task.data == {"my_var": "THE VAR"} def test_process_model_file_create( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_refresh_permissions.py b/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_refresh_permissions.py index 20176dd8..225e870f 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_refresh_permissions.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_refresh_permissions.py @@ -5,22 +5,19 @@ from flask.testing import FlaskClient from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec -from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) +from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError class TestRefreshPermissions(BaseTest): - """TestRefreshPermissions.""" - def test_refresh_permissions_requires_elevated_permission( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, ) -> None: - """Test_refresh_permissions_requires_elevated_permission.""" basic_user = self.find_or_create_user("basic_user") privileged_user = self.find_or_create_user("privileged_user") self.add_permissions_to_user( @@ -38,7 +35,7 @@ class TestRefreshPermissions(BaseTest): processor = ProcessInstanceProcessor(process_instance) - with pytest.raises(ApiError) as exception: + with pytest.raises(WorkflowExecutionServiceError) as exception: processor.do_engine_steps(save=True) assert "ScriptUnauthorizedForUserError" in str(exception) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py index adbd2240..33eb86cc 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py @@ -5,7 +5,6 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from spiffworkflow_backend import db -from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus @@ -19,6 +18,7 @@ from spiffworkflow_backend.services.process_instance_service import ( ProcessInstanceService, ) from spiffworkflow_backend.services.process_model_service import ProcessModelService +from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError class TestErrorHandlingService(BaseTest): @@ -34,9 +34,9 @@ class TestErrorHandlingService(BaseTest): process_model.id, user ) pip = ProcessInstanceProcessor(process_instance) - with pytest.raises(ApiError) as e: + with pytest.raises(WorkflowExecutionServiceError) as e: pip.do_engine_steps(save=True) - ErrorHandlingService().handle_error(pip, e.value) + ErrorHandlingService().handle_error(process_instance, e.value) return process_instance def test_handle_error_suspends_or_faults_process( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index d0d4eb73..0dc65e9d 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -10,12 +10,12 @@ from SpiffWorkflow.task import TaskState from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec -from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task_definition import TaskDefinitionModel from spiffworkflow_backend.models.user import UserModel @@ -29,6 +29,7 @@ from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_service import ( ProcessInstanceService, ) +from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError class TestProcessInstanceProcessor(BaseTest): @@ -713,7 +714,7 @@ class TestProcessInstanceProcessor(BaseTest): spiff_task = processor.get_task_by_guid(human_task_three.task_id) ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user, human_task_three) - def test_task_data_is_set_even_if_process_instance_errors( + def test_task_data_is_set_even_if_process_instance_errors_and_creates_task_failed_event( self, app: Flask, client: FlaskClient, @@ -731,7 +732,7 @@ class TestProcessInstanceProcessor(BaseTest): ) processor = ProcessInstanceProcessor(process_instance) - with pytest.raises(ApiError): + with pytest.raises(WorkflowExecutionServiceError): processor.do_engine_steps(save=True) process_instance_final = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() @@ -741,5 +742,22 @@ class TestProcessInstanceProcessor(BaseTest): "script_task_two", processor_final.bpmn_process_instance ) assert spiff_task is not None - assert spiff_task.state == TaskState.WAITING + assert spiff_task.state == TaskState.ERROR assert spiff_task.data == {"my_var": "THE VAR"} + + process_instance_events = process_instance.process_instance_events + assert len(process_instance_events) == 4 + error_events = [ + e for e in process_instance_events if e.event_type == ProcessInstanceEventType.task_failed.value + ] + assert len(error_events) == 1 + error_event = error_events[0] + assert error_event.task_guid is not None + process_instance_error_details = error_event.error_details + assert len(process_instance_error_details) == 1 + error_detail = process_instance_error_details[0] + assert error_detail.message == "NameError:name 'hey' is not defined. Did you mean 'my_var'?" + assert error_detail.task_offset is None + assert error_detail.task_line_number == 1 + assert error_detail.task_line_contents == "hey" + assert error_detail.task_trace is not None diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_restricted_script_engine.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_restricted_script_engine.py index 330d115f..d8b90a8a 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_restricted_script_engine.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_restricted_script_engine.py @@ -5,24 +5,21 @@ from flask.testing import FlaskClient from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec -from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) +from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError -class TestOpenFile(BaseTest): - """TestVariousBpmnConstructs.""" - - def test_dot_notation( +class TestRestrictedScriptEngine(BaseTest): + def test_dot_notation_with_open_file( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel, ) -> None: - """Test_form_data_conversion_to_dot_dict.""" self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") process_model = load_test_spec( "test_group/dangerous", @@ -34,22 +31,17 @@ class TestOpenFile(BaseTest): process_instance = self.create_process_instance_from_process_model(process_model) processor = ProcessInstanceProcessor(process_instance) - with pytest.raises(ApiError) as exception: + with pytest.raises(WorkflowExecutionServiceError) as exception: processor.do_engine_steps(save=True) assert "name 'open' is not defined" in str(exception.value) - -class TestImportModule(BaseTest): - """TestVariousBpmnConstructs.""" - - def test_dot_notation( + def test_dot_notation_with_import_module( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel, ) -> None: - """Test_form_data_conversion_to_dot_dict.""" self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") process_model = load_test_spec( "test_group/dangerous", @@ -61,6 +53,6 @@ class TestImportModule(BaseTest): process_instance = self.create_process_instance_from_process_model(process_model) processor = ProcessInstanceProcessor(process_instance) - with pytest.raises(ApiError) as exception: + with pytest.raises(WorkflowExecutionServiceError) as exception: processor.do_engine_steps(save=True) assert "Import not allowed: os" in str(exception.value) diff --git a/spiffworkflow-frontend/src/components/ErrorDisplay.tsx b/spiffworkflow-frontend/src/components/ErrorDisplay.tsx index 411a9a53..67e6e18d 100644 --- a/spiffworkflow-frontend/src/components/ErrorDisplay.tsx +++ b/spiffworkflow-frontend/src/components/ErrorDisplay.tsx @@ -1,5 +1,10 @@ import { Notification } from './Notification'; import useAPIError from '../hooks/UseApiError'; +import { + ErrorForDisplay, + ProcessInstanceEventErrorDetail, + ProcessInstanceLogEntry, +} from '../interfaces'; function errorDetailDisplay( errorObject: any, @@ -18,57 +23,96 @@ function errorDetailDisplay( return null; } +export const errorForDisplayFromProcessInstanceErrorDetail = ( + processInstanceEvent: ProcessInstanceLogEntry, + processInstanceErrorEventDetail: ProcessInstanceEventErrorDetail +) => { + const errorForDisplay: ErrorForDisplay = { + message: processInstanceErrorEventDetail.message, + messageClassName: 'failure-string', + task_name: processInstanceEvent.task_definition_name, + task_id: processInstanceEvent.task_definition_identifier, + line_number: processInstanceErrorEventDetail.task_line_number, + error_line: processInstanceErrorEventDetail.task_line_contents, + task_trace: processInstanceErrorEventDetail.task_trace, + stacktrace: processInstanceErrorEventDetail.stacktrace, + }; + return errorForDisplay; +}; + +export const childrenForErrorObject = (errorObject: ErrorForDisplay) => { + let sentryLinkTag = null; + if (errorObject.sentry_link) { + sentryLinkTag = ( + + { + ': Find details about this error here (it may take a moment to become available): ' + } + + {errorObject.sentry_link} + + + ); + } + + const message = ( +
+ Stacktrace: + {errorObject.stacktrace.reverse().map((a) => ( + <> + {a} ++ ); + } + + return [ + message, +
+ > + ))} +