From 320d1b4083968ab9237cd00d9bb844899b5953d0 Mon Sep 17 00:00:00 2001 From: jasquat Date: Wed, 19 Apr 2023 11:24:54 -0400 Subject: [PATCH] store process instance errors in the db w/ burnettk --- .../{44a8f46cc508_.py => c95031498e62_.py} | 21 +++++- .../models/process_instance_error_detail.py | 17 +++++ .../models/process_instance_event.py | 4 ++ .../models/process_instance_metadata.py | 3 - .../routes/process_instances_controller.py | 44 ++++++------ .../routes/tasks_controller.py | 4 +- .../services/error_handling_service.py | 66 ++++++++--------- .../services/process_instance_processor.py | 70 +++---------------- .../process_instance_queue_service.py | 5 +- .../services/process_instance_service.py | 15 ++-- .../services/task_service.py | 34 +++++++++ 11 files changed, 153 insertions(+), 130 deletions(-) rename spiffworkflow-backend/migrations/versions/{44a8f46cc508_.py => c95031498e62_.py} (97%) create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_error_detail.py diff --git a/spiffworkflow-backend/migrations/versions/44a8f46cc508_.py b/spiffworkflow-backend/migrations/versions/c95031498e62_.py similarity index 97% rename from spiffworkflow-backend/migrations/versions/44a8f46cc508_.py rename to spiffworkflow-backend/migrations/versions/c95031498e62_.py index da70af01..c375745c 100644 --- a/spiffworkflow-backend/migrations/versions/44a8f46cc508_.py +++ b/spiffworkflow-backend/migrations/versions/c95031498e62_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 44a8f46cc508 +Revision ID: c95031498e62 Revises: -Create Date: 2023-04-17 15:40:28.658588 +Create Date: 2023-04-19 10:35:25.813002 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '44a8f46cc508' +revision = 'c95031498e62' down_revision = None branch_labels = None depends_on = None @@ -463,6 +463,17 @@ def upgrade(): batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'), ['message_instance_id'], unique=False) batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_name'), ['name'], unique=False) + op.create_table('process_instance_error_detail', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_instance_event_id', sa.Integer(), nullable=False), + sa.Column('message', sa.String(length=1024), nullable=False), + sa.Column('stacktrace', sa.Text(), nullable=False), + sa.ForeignKeyConstraint(['process_instance_event_id'], ['process_instance_event.id'], ), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id'), ['process_instance_event_id'], unique=False) + op.create_table('human_task_user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('human_task_id', sa.Integer(), nullable=False), @@ -486,6 +497,10 @@ def downgrade(): batch_op.drop_index(batch_op.f('ix_human_task_user_human_task_id')) op.drop_table('human_task_user') + with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id')) + + op.drop_table('process_instance_error_detail') with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op: batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_name')) batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id')) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_error_detail.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_error_detail.py new file mode 100644 index 00000000..91324c77 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_error_detail.py @@ -0,0 +1,17 @@ +from sqlalchemy.orm import relationship +from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel +from sqlalchemy import ForeignKey +from spiffworkflow_backend.models.db import db + + +class ProcessInstanceErrorDetailModel(SpiffworkflowBaseDBModel): + __tablename__ = "process_instance_error_detail" + id: int = db.Column(db.Integer, primary_key=True) + + process_instance_event_id: int = db.Column(ForeignKey("process_instance_event.id"), nullable=False, index=True) + process_instance_event = relationship('ProcessInstanceEventModel') + + message: str = db.Column(db.String(1024), nullable=False) + + # this should be 65k in mysql + stacktrace: str = db.Column(db.Text(), nullable=False) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py index fe920b57..936e4d9d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py @@ -1,4 +1,5 @@ from __future__ import annotations +from sqlalchemy.orm import relationship from typing import Any @@ -13,6 +14,7 @@ from spiffworkflow_backend.models.user import UserModel # event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same. class ProcessInstanceEventType(SpiffEnum): + process_instance_error = "process_instance_error" process_instance_resumed = "process_instance_resumed" process_instance_rewound_to_task = "process_instance_rewound_to_task" process_instance_suspended = "process_instance_suspended" @@ -37,6 +39,8 @@ class ProcessInstanceEventModel(SpiffworkflowBaseDBModel): user_id = db.Column(ForeignKey(UserModel.id), nullable=True, index=True) # type: ignore + error_deatils = relationship("ProcessInstanceErrorDetailModel", cascade="delete") # type: ignore + @validates("event_type") def validate_event_type(self, key: str, value: Any) -> Any: return self.validate_enum_field(key, value, ProcessInstanceEventType) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py index 4e00a6ca..ef043a7e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_metadata.py @@ -1,4 +1,3 @@ -"""Process_instance_metadata.""" from dataclasses import dataclass from sqlalchemy import ForeignKey @@ -10,8 +9,6 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel @dataclass class ProcessInstanceMetadataModel(SpiffworkflowBaseDBModel): - """ProcessInstanceMetadataModel.""" - __tablename__ = "process_instance_metadata" __table_args__ = (db.UniqueConstraint("process_instance_id", "key", name="process_instance_metadata_unique"),) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index 6813f415..1e6833c6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -112,7 +112,6 @@ def process_instance_create( def process_instance_run( modified_process_model_identifier: str, process_instance_id: int, - do_engine_steps: bool = True, ) -> flask.wrappers.Response: """Process_instance_run.""" process_instance = _find_process_instance_by_id_or_raise(process_instance_id) @@ -123,22 +122,22 @@ def process_instance_run( status_code=400, ) - processor = ProcessInstanceProcessor(process_instance) - - if do_engine_steps: - try: - processor.do_engine_steps(save=True) - except ( - ApiError, - ProcessInstanceIsNotEnqueuedError, - ProcessInstanceIsAlreadyLockedError, - ) as e: - ErrorHandlingService().handle_error(processor, e) - raise e - except Exception as e: - ErrorHandlingService().handle_error(processor, e) - # FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes. - # we need to recurse through all last tasks if the last task is a call activity or subprocess. + processor = None + try: + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + except ( + ApiError, + ProcessInstanceIsNotEnqueuedError, + ProcessInstanceIsAlreadyLockedError, + ) as e: + ErrorHandlingService.handle_error(process_instance, e) + raise e + except Exception as e: + ErrorHandlingService.handle_error(process_instance, e) + # FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes. + # we need to recurse through all last tasks if the last task is a call activity or subprocess. + if processor is not None: task = processor.bpmn_process_instance.last_task raise ApiError.from_task( error_code="unknown_exception", @@ -146,9 +145,10 @@ def process_instance_run( status_code=400, task=task, ) from e + raise e - if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: - MessageService.correlate_all_message_instances() + if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: + MessageService.correlate_all_message_instances() process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor) process_instance_data = processor.get_data() @@ -172,7 +172,7 @@ def process_instance_terminate( ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError, ) as e: - ErrorHandlingService().handle_error(processor, e) + ErrorHandlingService.handle_error(process_instance, e) raise e return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -193,7 +193,7 @@ def process_instance_suspend( ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError, ) as e: - ErrorHandlingService().handle_error(processor, e) + ErrorHandlingService.handle_error(process_instance, e) raise e return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -214,7 +214,7 @@ def process_instance_resume( ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError, ) as e: - ErrorHandlingService().handle_error(processor, e) + ErrorHandlingService.handle_error(process_instance, e) raise e return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 9baffd25..2a4ffc1d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -215,7 +215,7 @@ def task_data_update( ) if json_data_dict is not None: TaskService.insert_or_update_json_data_records({json_data_dict["hash"]: json_data_dict}) - ProcessInstanceProcessor.add_event_to_process_instance( + TaskService.add_event_to_process_instance( process_instance, ProcessInstanceEventType.task_data_edited.value, task_guid=task_guid ) try: @@ -428,7 +428,7 @@ def _task_submit_shared( if save_as_draft: task_model = _get_task_model_from_guid_or_raise(task_guid, process_instance_id) - ProcessInstanceService.update_form_task_data(processor, spiff_task, body, g.user) + ProcessInstanceService.update_form_task_data(process_instance, spiff_task, body, g.user) json_data_dict = TaskService.update_task_data_on_task_model_and_return_dict_if_updated( task_model, spiff_task.data, "json_data_hash" ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py index 4407c6db..6696f353 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py @@ -1,5 +1,6 @@ -"""Error_handling_service.""" from typing import Union +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType +from spiffworkflow_backend.services.task_service import TaskService from flask import current_app from flask import g @@ -11,56 +12,52 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.services.message_service import MessageService -from spiffworkflow_backend.services.process_instance_processor import ( - ProcessInstanceProcessor, -) from spiffworkflow_backend.services.process_model_service import ProcessModelService class ErrorHandlingService: - """ErrorHandlingService.""" - MESSAGE_NAME = "SystemErrorMessage" - @staticmethod - def set_instance_status(instance_id: int, status: str) -> None: - """Set_instance_status.""" - instance = db.session.query(ProcessInstanceModel).filter(ProcessInstanceModel.id == instance_id).first() - if instance: - instance.status = status - db.session.commit() - - def handle_error(self, _processor: ProcessInstanceProcessor, _error: Union[ApiError, Exception]) -> None: + @classmethod + def handle_error(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception]) -> None: """On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception.""" - process_model = ProcessModelService.get_process_model(_processor.process_model_identifier) - # First, suspend or fault the instance - if process_model.fault_or_suspend_on_exception == "suspend": - self.set_instance_status( - _processor.process_instance_model.id, - ProcessInstanceStatus.suspended.value, - ) - else: - # fault is the default - self.set_instance_status( - _processor.process_instance_model.id, - ProcessInstanceStatus.error.value, - ) + process_model = ProcessModelService.get_process_model(process_instance.process_model_identifier) + cls._update_process_instance_in_database(process_instance, error, process_model.fault_or_suspend_on_exception) # Second, send a bpmn message out, but only if an exception notification address is provided # This will create a new Send Message with correlation keys on the recipients and the message # body. if len(process_model.exception_notification_addresses) > 0: try: - self.handle_system_notification(_error, process_model, _processor) + cls._handle_system_notification(error, process_model, process_instance) except Exception as e: # hmm... what to do if a notification method fails. Probably log, at least current_app.logger.error(e) + @classmethod + def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception], fault_or_suspend_on_exception: str) -> None: + TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=error) + + # First, suspend or fault the instance + if fault_or_suspend_on_exception == "suspend": + cls._set_instance_status( + process_instance, + ProcessInstanceStatus.suspended.value, + ) + else: + # fault is the default + cls._set_instance_status( + process_instance, + ProcessInstanceStatus.error.value, + ) + + db.session.commit() + @staticmethod - def handle_system_notification( + def _handle_system_notification( error: Union[ApiError, Exception], process_model: ProcessModelInfo, - _processor: ProcessInstanceProcessor, + process_instance: ProcessInstanceModel, ) -> None: """Send a BPMN Message - which may kick off a waiting process.""" message_text = ( @@ -74,7 +71,7 @@ class ErrorHandlingService: if "user" in g: user_id = g.user.id else: - user_id = _processor.process_instance_model.process_initiator_id + user_id = process_instance.process_initiator_id message_instance = MessageInstanceModel( message_type="send", @@ -85,3 +82,8 @@ class ErrorHandlingService: db.session.add(message_instance) db.session.commit() MessageService.correlate_send_message(message_instance) + + @staticmethod + def _set_instance_status(process_instance: ProcessInstanceModel, status: str) -> None: + process_instance.status = status + db.session.add(process_instance) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 8cd8dfe4..1e2a42f3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,5 +1,10 @@ """Process_instance_processor.""" + +# TODO: clean up this service for a clear distinction between it and the process_instance_service +# where this points to the pi service +import traceback import _strptime # type: ignore +from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel import copy import decimal import json @@ -1318,7 +1323,7 @@ class ProcessInstanceProcessor: db.session.bulk_save_objects(new_task_models.values()) TaskService.insert_or_update_json_data_records(new_json_data_dicts) - self.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id) + TaskService.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id) self.save() # Saving the workflow seems to reset the status self.suspend() @@ -1331,7 +1336,7 @@ class ProcessInstanceProcessor: def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None: """Reset a process to an earlier state.""" # raise Exception("This feature to reset a process instance to a given task is currently unavaiable") - cls.add_event_to_process_instance( + TaskService.add_event_to_process_instance( process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid ) @@ -1861,7 +1866,7 @@ class ProcessInstanceProcessor: TaskService.update_json_data_dicts_using_list(json_data_dict_list, json_data_dict_mapping) TaskService.insert_or_update_json_data_records(json_data_dict_mapping) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.task_completed.value, task_guid=task_model.guid, @@ -1960,49 +1965,13 @@ class ProcessInstanceProcessor: return task return None - def get_nav_item(self, task: SpiffTask) -> Any: - """Get_nav_item.""" - for nav_item in self.bpmn_process_instance.get_nav_list(): - if nav_item["task_id"] == task.id: - return nav_item - return None - - def find_spec_and_field(self, spec_name: str, field_id: Union[str, int]) -> Any: - """Tracks down a form field by name in the process_instance spec(s), Returns a tuple of the task, and form.""" - process_instances = [self.bpmn_process_instance] - for task in self.bpmn_process_instance.get_ready_user_tasks(): - if task.process_instance not in process_instances: - process_instances.append(task.process_instance) - for process_instance in process_instances: - for spec in process_instance.spec.task_specs.values(): - if spec.name == spec_name: - if not hasattr(spec, "form"): - raise ApiError( - "invalid_spec", - "The spec name you provided does not contain a form.", - ) - - for field in spec.form.fields: - if field.id == field_id: - return spec, field - - raise ApiError( - "invalid_field", - f"The task '{spec_name}' has no field named '{field_id}'", - ) - - raise ApiError( - "invalid_spec", - f"Unable to find a task in the process_instance called '{spec_name}'", - ) - def terminate(self) -> None: """Terminate.""" self.bpmn_process_instance.cancel() self.save() self.process_instance_model.status = "terminated" db.session.add(self.process_instance_model) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.process_instance_terminated.value ) db.session.commit() @@ -2011,7 +1980,7 @@ class ProcessInstanceProcessor: """Suspend.""" self.process_instance_model.status = ProcessInstanceStatus.suspended.value db.session.add(self.process_instance_model) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.process_instance_suspended.value ) db.session.commit() @@ -2020,24 +1989,7 @@ class ProcessInstanceProcessor: """Resume.""" self.process_instance_model.status = ProcessInstanceStatus.waiting.value db.session.add(self.process_instance_model) - self.add_event_to_process_instance( + TaskService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value ) db.session.commit() - - @classmethod - def add_event_to_process_instance( - cls, - process_instance: ProcessInstanceModel, - event_type: str, - task_guid: Optional[str] = None, - user_id: Optional[int] = None, - ) -> None: - if user_id is None and hasattr(g, "user") and g.user: - user_id = g.user.id - process_instance_event = ProcessInstanceEventModel( - process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id - ) - if task_guid: - process_instance_event.task_guid = task_guid - db.session.add(process_instance_event) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index dd733816..59915eb1 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -1,5 +1,7 @@ import contextlib import time +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType +from spiffworkflow_backend.services.task_service import TaskService from typing import Generator from typing import List from typing import Optional @@ -24,8 +26,6 @@ class ProcessInstanceIsAlreadyLockedError(Exception): class ProcessInstanceQueueService: - """TODO: comment.""" - @classmethod def _configure_and_save_queue_entry( cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel @@ -99,6 +99,7 @@ class ProcessInstanceQueueService: except Exception as ex: process_instance.status = ProcessInstanceStatus.error.value db.session.add(process_instance) + TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex) db.session.commit() raise ex finally: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 3a0307f1..c31bb447 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -322,19 +322,20 @@ class ProcessInstanceService: cls.replace_file_data_with_digest_references(data, models) - @staticmethod + @classmethod def update_form_task_data( - processor: ProcessInstanceProcessor, + cls, + process_instance: ProcessInstanceModel, spiff_task: SpiffTask, data: dict[str, Any], user: UserModel, ) -> None: - AuthorizationService.assert_user_can_complete_spiff_task(processor.process_instance_model.id, spiff_task, user) - ProcessInstanceService.save_file_data_and_replace_with_digest_references( + AuthorizationService.assert_user_can_complete_spiff_task(process_instance.id, spiff_task, user) + cls.save_file_data_and_replace_with_digest_references( data, - processor.process_instance_model.id, + process_instance.id, ) - dot_dct = ProcessInstanceService.create_dot_dict(data) + dot_dct = cls.create_dot_dict(data) spiff_task.update_data(dot_dct) @staticmethod @@ -350,7 +351,7 @@ class ProcessInstanceService: Abstracted here because we need to do it multiple times when completing all tasks in a multi-instance task. """ - ProcessInstanceService.update_form_task_data(processor, spiff_task, data, user) + ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user) # ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store. processor.complete_task(spiff_task, human_task, user=user) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 76880da2..d8123fd8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -1,5 +1,8 @@ import copy import json +from flask import g +from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel +import traceback import time from hashlib import sha256 from typing import Optional @@ -592,3 +595,34 @@ class TaskService: for json_data_dict in json_data_dict_list: if json_data_dict is not None: json_data_dicts[json_data_dict["hash"]] = json_data_dict + + # TODO: move to process_instance_service once we clean it and the processor up + @classmethod + def add_event_to_process_instance( + cls, + process_instance: ProcessInstanceModel, + event_type: str, + task_guid: Optional[str] = None, + user_id: Optional[int] = None, + exception: Optional[Exception] = None, + ) -> None: + if user_id is None and hasattr(g, "user") and g.user: + user_id = g.user.id + process_instance_event = ProcessInstanceEventModel( + process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id + ) + if task_guid: + process_instance_event.task_guid = task_guid + db.session.add(process_instance_event) + + if event_type == ProcessInstanceEventType.process_instance_error.value and exception is not None: + # truncate to avoid database errors on large values. We observed that text in mysql is 65K. + stacktrace = traceback.format_exc()[0:63999] + message = str(exception)[0:1023] + + process_instance_error_detail = ProcessInstanceErrorDetailModel( + process_instance_event=process_instance_event, + message=message, + stacktrace=stacktrace, + ) + db.session.add(process_instance_error_detail)