store process instance errors in the db w/ burnettk
This commit is contained in:
parent
eebe987337
commit
320d1b4083
|
@ -1,8 +1,8 @@
|
||||||
"""empty message
|
"""empty message
|
||||||
|
|
||||||
Revision ID: 44a8f46cc508
|
Revision ID: c95031498e62
|
||||||
Revises:
|
Revises:
|
||||||
Create Date: 2023-04-17 15:40:28.658588
|
Create Date: 2023-04-19 10:35:25.813002
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from alembic import op
|
from alembic import op
|
||||||
|
@ -10,7 +10,7 @@ import sqlalchemy as sa
|
||||||
from sqlalchemy.dialects import mysql
|
from sqlalchemy.dialects import mysql
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision = '44a8f46cc508'
|
revision = 'c95031498e62'
|
||||||
down_revision = None
|
down_revision = None
|
||||||
branch_labels = None
|
branch_labels = None
|
||||||
depends_on = None
|
depends_on = None
|
||||||
|
@ -463,6 +463,17 @@ def upgrade():
|
||||||
batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'), ['message_instance_id'], unique=False)
|
batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'), ['message_instance_id'], unique=False)
|
||||||
batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_name'), ['name'], unique=False)
|
batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_name'), ['name'], unique=False)
|
||||||
|
|
||||||
|
op.create_table('process_instance_error_detail',
|
||||||
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('process_instance_event_id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('message', sa.String(length=1024), nullable=False),
|
||||||
|
sa.Column('stacktrace', sa.Text(), nullable=False),
|
||||||
|
sa.ForeignKeyConstraint(['process_instance_event_id'], ['process_instance_event.id'], ),
|
||||||
|
sa.PrimaryKeyConstraint('id')
|
||||||
|
)
|
||||||
|
with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op:
|
||||||
|
batch_op.create_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id'), ['process_instance_event_id'], unique=False)
|
||||||
|
|
||||||
op.create_table('human_task_user',
|
op.create_table('human_task_user',
|
||||||
sa.Column('id', sa.Integer(), nullable=False),
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
sa.Column('human_task_id', sa.Integer(), nullable=False),
|
sa.Column('human_task_id', sa.Integer(), nullable=False),
|
||||||
|
@ -486,6 +497,10 @@ def downgrade():
|
||||||
batch_op.drop_index(batch_op.f('ix_human_task_user_human_task_id'))
|
batch_op.drop_index(batch_op.f('ix_human_task_user_human_task_id'))
|
||||||
|
|
||||||
op.drop_table('human_task_user')
|
op.drop_table('human_task_user')
|
||||||
|
with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op:
|
||||||
|
batch_op.drop_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id'))
|
||||||
|
|
||||||
|
op.drop_table('process_instance_error_detail')
|
||||||
with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op:
|
with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op:
|
||||||
batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_name'))
|
batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_name'))
|
||||||
batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'))
|
batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'))
|
|
@ -0,0 +1,17 @@
|
||||||
|
from sqlalchemy.orm import relationship
|
||||||
|
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||||
|
from sqlalchemy import ForeignKey
|
||||||
|
from spiffworkflow_backend.models.db import db
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessInstanceErrorDetailModel(SpiffworkflowBaseDBModel):
|
||||||
|
__tablename__ = "process_instance_error_detail"
|
||||||
|
id: int = db.Column(db.Integer, primary_key=True)
|
||||||
|
|
||||||
|
process_instance_event_id: int = db.Column(ForeignKey("process_instance_event.id"), nullable=False, index=True)
|
||||||
|
process_instance_event = relationship('ProcessInstanceEventModel')
|
||||||
|
|
||||||
|
message: str = db.Column(db.String(1024), nullable=False)
|
||||||
|
|
||||||
|
# this should be 65k in mysql
|
||||||
|
stacktrace: str = db.Column(db.Text(), nullable=False)
|
|
@ -1,4 +1,5 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from sqlalchemy.orm import relationship
|
||||||
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
@ -13,6 +14,7 @@ from spiffworkflow_backend.models.user import UserModel
|
||||||
|
|
||||||
# event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same.
|
# event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same.
|
||||||
class ProcessInstanceEventType(SpiffEnum):
|
class ProcessInstanceEventType(SpiffEnum):
|
||||||
|
process_instance_error = "process_instance_error"
|
||||||
process_instance_resumed = "process_instance_resumed"
|
process_instance_resumed = "process_instance_resumed"
|
||||||
process_instance_rewound_to_task = "process_instance_rewound_to_task"
|
process_instance_rewound_to_task = "process_instance_rewound_to_task"
|
||||||
process_instance_suspended = "process_instance_suspended"
|
process_instance_suspended = "process_instance_suspended"
|
||||||
|
@ -37,6 +39,8 @@ class ProcessInstanceEventModel(SpiffworkflowBaseDBModel):
|
||||||
|
|
||||||
user_id = db.Column(ForeignKey(UserModel.id), nullable=True, index=True) # type: ignore
|
user_id = db.Column(ForeignKey(UserModel.id), nullable=True, index=True) # type: ignore
|
||||||
|
|
||||||
|
error_deatils = relationship("ProcessInstanceErrorDetailModel", cascade="delete") # type: ignore
|
||||||
|
|
||||||
@validates("event_type")
|
@validates("event_type")
|
||||||
def validate_event_type(self, key: str, value: Any) -> Any:
|
def validate_event_type(self, key: str, value: Any) -> Any:
|
||||||
return self.validate_enum_field(key, value, ProcessInstanceEventType)
|
return self.validate_enum_field(key, value, ProcessInstanceEventType)
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
"""Process_instance_metadata."""
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from sqlalchemy import ForeignKey
|
from sqlalchemy import ForeignKey
|
||||||
|
@ -10,8 +9,6 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ProcessInstanceMetadataModel(SpiffworkflowBaseDBModel):
|
class ProcessInstanceMetadataModel(SpiffworkflowBaseDBModel):
|
||||||
"""ProcessInstanceMetadataModel."""
|
|
||||||
|
|
||||||
__tablename__ = "process_instance_metadata"
|
__tablename__ = "process_instance_metadata"
|
||||||
__table_args__ = (db.UniqueConstraint("process_instance_id", "key", name="process_instance_metadata_unique"),)
|
__table_args__ = (db.UniqueConstraint("process_instance_id", "key", name="process_instance_metadata_unique"),)
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,6 @@ def process_instance_create(
|
||||||
def process_instance_run(
|
def process_instance_run(
|
||||||
modified_process_model_identifier: str,
|
modified_process_model_identifier: str,
|
||||||
process_instance_id: int,
|
process_instance_id: int,
|
||||||
do_engine_steps: bool = True,
|
|
||||||
) -> flask.wrappers.Response:
|
) -> flask.wrappers.Response:
|
||||||
"""Process_instance_run."""
|
"""Process_instance_run."""
|
||||||
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
|
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
|
||||||
|
@ -123,22 +122,22 @@ def process_instance_run(
|
||||||
status_code=400,
|
status_code=400,
|
||||||
)
|
)
|
||||||
|
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = None
|
||||||
|
try:
|
||||||
if do_engine_steps:
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
try:
|
processor.do_engine_steps(save=True)
|
||||||
processor.do_engine_steps(save=True)
|
except (
|
||||||
except (
|
ApiError,
|
||||||
ApiError,
|
ProcessInstanceIsNotEnqueuedError,
|
||||||
ProcessInstanceIsNotEnqueuedError,
|
ProcessInstanceIsAlreadyLockedError,
|
||||||
ProcessInstanceIsAlreadyLockedError,
|
) as e:
|
||||||
) as e:
|
ErrorHandlingService.handle_error(process_instance, e)
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
raise e
|
||||||
raise e
|
except Exception as e:
|
||||||
except Exception as e:
|
ErrorHandlingService.handle_error(process_instance, e)
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
# FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes.
|
||||||
# FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes.
|
# we need to recurse through all last tasks if the last task is a call activity or subprocess.
|
||||||
# we need to recurse through all last tasks if the last task is a call activity or subprocess.
|
if processor is not None:
|
||||||
task = processor.bpmn_process_instance.last_task
|
task = processor.bpmn_process_instance.last_task
|
||||||
raise ApiError.from_task(
|
raise ApiError.from_task(
|
||||||
error_code="unknown_exception",
|
error_code="unknown_exception",
|
||||||
|
@ -146,9 +145,10 @@ def process_instance_run(
|
||||||
status_code=400,
|
status_code=400,
|
||||||
task=task,
|
task=task,
|
||||||
) from e
|
) from e
|
||||||
|
raise e
|
||||||
|
|
||||||
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
|
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
|
||||||
MessageService.correlate_all_message_instances()
|
MessageService.correlate_all_message_instances()
|
||||||
|
|
||||||
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
|
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
|
||||||
process_instance_data = processor.get_data()
|
process_instance_data = processor.get_data()
|
||||||
|
@ -172,7 +172,7 @@ def process_instance_terminate(
|
||||||
ProcessInstanceIsNotEnqueuedError,
|
ProcessInstanceIsNotEnqueuedError,
|
||||||
ProcessInstanceIsAlreadyLockedError,
|
ProcessInstanceIsAlreadyLockedError,
|
||||||
) as e:
|
) as e:
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
ErrorHandlingService.handle_error(process_instance, e)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||||
|
@ -193,7 +193,7 @@ def process_instance_suspend(
|
||||||
ProcessInstanceIsNotEnqueuedError,
|
ProcessInstanceIsNotEnqueuedError,
|
||||||
ProcessInstanceIsAlreadyLockedError,
|
ProcessInstanceIsAlreadyLockedError,
|
||||||
) as e:
|
) as e:
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
ErrorHandlingService.handle_error(process_instance, e)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||||
|
@ -214,7 +214,7 @@ def process_instance_resume(
|
||||||
ProcessInstanceIsNotEnqueuedError,
|
ProcessInstanceIsNotEnqueuedError,
|
||||||
ProcessInstanceIsAlreadyLockedError,
|
ProcessInstanceIsAlreadyLockedError,
|
||||||
) as e:
|
) as e:
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
ErrorHandlingService.handle_error(process_instance, e)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||||
|
|
|
@ -215,7 +215,7 @@ def task_data_update(
|
||||||
)
|
)
|
||||||
if json_data_dict is not None:
|
if json_data_dict is not None:
|
||||||
TaskService.insert_or_update_json_data_records({json_data_dict["hash"]: json_data_dict})
|
TaskService.insert_or_update_json_data_records({json_data_dict["hash"]: json_data_dict})
|
||||||
ProcessInstanceProcessor.add_event_to_process_instance(
|
TaskService.add_event_to_process_instance(
|
||||||
process_instance, ProcessInstanceEventType.task_data_edited.value, task_guid=task_guid
|
process_instance, ProcessInstanceEventType.task_data_edited.value, task_guid=task_guid
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
@ -428,7 +428,7 @@ def _task_submit_shared(
|
||||||
|
|
||||||
if save_as_draft:
|
if save_as_draft:
|
||||||
task_model = _get_task_model_from_guid_or_raise(task_guid, process_instance_id)
|
task_model = _get_task_model_from_guid_or_raise(task_guid, process_instance_id)
|
||||||
ProcessInstanceService.update_form_task_data(processor, spiff_task, body, g.user)
|
ProcessInstanceService.update_form_task_data(process_instance, spiff_task, body, g.user)
|
||||||
json_data_dict = TaskService.update_task_data_on_task_model_and_return_dict_if_updated(
|
json_data_dict = TaskService.update_task_data_on_task_model_and_return_dict_if_updated(
|
||||||
task_model, spiff_task.data, "json_data_hash"
|
task_model, spiff_task.data, "json_data_hash"
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
"""Error_handling_service."""
|
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||||
|
from spiffworkflow_backend.services.task_service import TaskService
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from flask import g
|
from flask import g
|
||||||
|
@ -11,56 +12,52 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||||
from spiffworkflow_backend.services.message_service import MessageService
|
from spiffworkflow_backend.services.message_service import MessageService
|
||||||
from spiffworkflow_backend.services.process_instance_processor import (
|
|
||||||
ProcessInstanceProcessor,
|
|
||||||
)
|
|
||||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||||
|
|
||||||
|
|
||||||
class ErrorHandlingService:
|
class ErrorHandlingService:
|
||||||
"""ErrorHandlingService."""
|
|
||||||
|
|
||||||
MESSAGE_NAME = "SystemErrorMessage"
|
MESSAGE_NAME = "SystemErrorMessage"
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def set_instance_status(instance_id: int, status: str) -> None:
|
def handle_error(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception]) -> None:
|
||||||
"""Set_instance_status."""
|
|
||||||
instance = db.session.query(ProcessInstanceModel).filter(ProcessInstanceModel.id == instance_id).first()
|
|
||||||
if instance:
|
|
||||||
instance.status = status
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
def handle_error(self, _processor: ProcessInstanceProcessor, _error: Union[ApiError, Exception]) -> None:
|
|
||||||
"""On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception."""
|
"""On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception."""
|
||||||
process_model = ProcessModelService.get_process_model(_processor.process_model_identifier)
|
process_model = ProcessModelService.get_process_model(process_instance.process_model_identifier)
|
||||||
# First, suspend or fault the instance
|
cls._update_process_instance_in_database(process_instance, error, process_model.fault_or_suspend_on_exception)
|
||||||
if process_model.fault_or_suspend_on_exception == "suspend":
|
|
||||||
self.set_instance_status(
|
|
||||||
_processor.process_instance_model.id,
|
|
||||||
ProcessInstanceStatus.suspended.value,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# fault is the default
|
|
||||||
self.set_instance_status(
|
|
||||||
_processor.process_instance_model.id,
|
|
||||||
ProcessInstanceStatus.error.value,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Second, send a bpmn message out, but only if an exception notification address is provided
|
# Second, send a bpmn message out, but only if an exception notification address is provided
|
||||||
# This will create a new Send Message with correlation keys on the recipients and the message
|
# This will create a new Send Message with correlation keys on the recipients and the message
|
||||||
# body.
|
# body.
|
||||||
if len(process_model.exception_notification_addresses) > 0:
|
if len(process_model.exception_notification_addresses) > 0:
|
||||||
try:
|
try:
|
||||||
self.handle_system_notification(_error, process_model, _processor)
|
cls._handle_system_notification(error, process_model, process_instance)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# hmm... what to do if a notification method fails. Probably log, at least
|
# hmm... what to do if a notification method fails. Probably log, at least
|
||||||
current_app.logger.error(e)
|
current_app.logger.error(e)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception], fault_or_suspend_on_exception: str) -> None:
|
||||||
|
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=error)
|
||||||
|
|
||||||
|
# First, suspend or fault the instance
|
||||||
|
if fault_or_suspend_on_exception == "suspend":
|
||||||
|
cls._set_instance_status(
|
||||||
|
process_instance,
|
||||||
|
ProcessInstanceStatus.suspended.value,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# fault is the default
|
||||||
|
cls._set_instance_status(
|
||||||
|
process_instance,
|
||||||
|
ProcessInstanceStatus.error.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def handle_system_notification(
|
def _handle_system_notification(
|
||||||
error: Union[ApiError, Exception],
|
error: Union[ApiError, Exception],
|
||||||
process_model: ProcessModelInfo,
|
process_model: ProcessModelInfo,
|
||||||
_processor: ProcessInstanceProcessor,
|
process_instance: ProcessInstanceModel,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Send a BPMN Message - which may kick off a waiting process."""
|
"""Send a BPMN Message - which may kick off a waiting process."""
|
||||||
message_text = (
|
message_text = (
|
||||||
|
@ -74,7 +71,7 @@ class ErrorHandlingService:
|
||||||
if "user" in g:
|
if "user" in g:
|
||||||
user_id = g.user.id
|
user_id = g.user.id
|
||||||
else:
|
else:
|
||||||
user_id = _processor.process_instance_model.process_initiator_id
|
user_id = process_instance.process_initiator_id
|
||||||
|
|
||||||
message_instance = MessageInstanceModel(
|
message_instance = MessageInstanceModel(
|
||||||
message_type="send",
|
message_type="send",
|
||||||
|
@ -85,3 +82,8 @@ class ErrorHandlingService:
|
||||||
db.session.add(message_instance)
|
db.session.add(message_instance)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
MessageService.correlate_send_message(message_instance)
|
MessageService.correlate_send_message(message_instance)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _set_instance_status(process_instance: ProcessInstanceModel, status: str) -> None:
|
||||||
|
process_instance.status = status
|
||||||
|
db.session.add(process_instance)
|
||||||
|
|
|
@ -1,5 +1,10 @@
|
||||||
"""Process_instance_processor."""
|
"""Process_instance_processor."""
|
||||||
|
|
||||||
|
# TODO: clean up this service for a clear distinction between it and the process_instance_service
|
||||||
|
# where this points to the pi service
|
||||||
|
import traceback
|
||||||
import _strptime # type: ignore
|
import _strptime # type: ignore
|
||||||
|
from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel
|
||||||
import copy
|
import copy
|
||||||
import decimal
|
import decimal
|
||||||
import json
|
import json
|
||||||
|
@ -1318,7 +1323,7 @@ class ProcessInstanceProcessor:
|
||||||
db.session.bulk_save_objects(new_task_models.values())
|
db.session.bulk_save_objects(new_task_models.values())
|
||||||
TaskService.insert_or_update_json_data_records(new_json_data_dicts)
|
TaskService.insert_or_update_json_data_records(new_json_data_dicts)
|
||||||
|
|
||||||
self.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id)
|
TaskService.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id)
|
||||||
self.save()
|
self.save()
|
||||||
# Saving the workflow seems to reset the status
|
# Saving the workflow seems to reset the status
|
||||||
self.suspend()
|
self.suspend()
|
||||||
|
@ -1331,7 +1336,7 @@ class ProcessInstanceProcessor:
|
||||||
def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None:
|
def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None:
|
||||||
"""Reset a process to an earlier state."""
|
"""Reset a process to an earlier state."""
|
||||||
# raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
|
# raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
|
||||||
cls.add_event_to_process_instance(
|
TaskService.add_event_to_process_instance(
|
||||||
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
|
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1861,7 +1866,7 @@ class ProcessInstanceProcessor:
|
||||||
TaskService.update_json_data_dicts_using_list(json_data_dict_list, json_data_dict_mapping)
|
TaskService.update_json_data_dicts_using_list(json_data_dict_list, json_data_dict_mapping)
|
||||||
TaskService.insert_or_update_json_data_records(json_data_dict_mapping)
|
TaskService.insert_or_update_json_data_records(json_data_dict_mapping)
|
||||||
|
|
||||||
self.add_event_to_process_instance(
|
TaskService.add_event_to_process_instance(
|
||||||
self.process_instance_model,
|
self.process_instance_model,
|
||||||
ProcessInstanceEventType.task_completed.value,
|
ProcessInstanceEventType.task_completed.value,
|
||||||
task_guid=task_model.guid,
|
task_guid=task_model.guid,
|
||||||
|
@ -1960,49 +1965,13 @@ class ProcessInstanceProcessor:
|
||||||
return task
|
return task
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_nav_item(self, task: SpiffTask) -> Any:
|
|
||||||
"""Get_nav_item."""
|
|
||||||
for nav_item in self.bpmn_process_instance.get_nav_list():
|
|
||||||
if nav_item["task_id"] == task.id:
|
|
||||||
return nav_item
|
|
||||||
return None
|
|
||||||
|
|
||||||
def find_spec_and_field(self, spec_name: str, field_id: Union[str, int]) -> Any:
|
|
||||||
"""Tracks down a form field by name in the process_instance spec(s), Returns a tuple of the task, and form."""
|
|
||||||
process_instances = [self.bpmn_process_instance]
|
|
||||||
for task in self.bpmn_process_instance.get_ready_user_tasks():
|
|
||||||
if task.process_instance not in process_instances:
|
|
||||||
process_instances.append(task.process_instance)
|
|
||||||
for process_instance in process_instances:
|
|
||||||
for spec in process_instance.spec.task_specs.values():
|
|
||||||
if spec.name == spec_name:
|
|
||||||
if not hasattr(spec, "form"):
|
|
||||||
raise ApiError(
|
|
||||||
"invalid_spec",
|
|
||||||
"The spec name you provided does not contain a form.",
|
|
||||||
)
|
|
||||||
|
|
||||||
for field in spec.form.fields:
|
|
||||||
if field.id == field_id:
|
|
||||||
return spec, field
|
|
||||||
|
|
||||||
raise ApiError(
|
|
||||||
"invalid_field",
|
|
||||||
f"The task '{spec_name}' has no field named '{field_id}'",
|
|
||||||
)
|
|
||||||
|
|
||||||
raise ApiError(
|
|
||||||
"invalid_spec",
|
|
||||||
f"Unable to find a task in the process_instance called '{spec_name}'",
|
|
||||||
)
|
|
||||||
|
|
||||||
def terminate(self) -> None:
|
def terminate(self) -> None:
|
||||||
"""Terminate."""
|
"""Terminate."""
|
||||||
self.bpmn_process_instance.cancel()
|
self.bpmn_process_instance.cancel()
|
||||||
self.save()
|
self.save()
|
||||||
self.process_instance_model.status = "terminated"
|
self.process_instance_model.status = "terminated"
|
||||||
db.session.add(self.process_instance_model)
|
db.session.add(self.process_instance_model)
|
||||||
self.add_event_to_process_instance(
|
TaskService.add_event_to_process_instance(
|
||||||
self.process_instance_model, ProcessInstanceEventType.process_instance_terminated.value
|
self.process_instance_model, ProcessInstanceEventType.process_instance_terminated.value
|
||||||
)
|
)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
@ -2011,7 +1980,7 @@ class ProcessInstanceProcessor:
|
||||||
"""Suspend."""
|
"""Suspend."""
|
||||||
self.process_instance_model.status = ProcessInstanceStatus.suspended.value
|
self.process_instance_model.status = ProcessInstanceStatus.suspended.value
|
||||||
db.session.add(self.process_instance_model)
|
db.session.add(self.process_instance_model)
|
||||||
self.add_event_to_process_instance(
|
TaskService.add_event_to_process_instance(
|
||||||
self.process_instance_model, ProcessInstanceEventType.process_instance_suspended.value
|
self.process_instance_model, ProcessInstanceEventType.process_instance_suspended.value
|
||||||
)
|
)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
@ -2020,24 +1989,7 @@ class ProcessInstanceProcessor:
|
||||||
"""Resume."""
|
"""Resume."""
|
||||||
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
|
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
|
||||||
db.session.add(self.process_instance_model)
|
db.session.add(self.process_instance_model)
|
||||||
self.add_event_to_process_instance(
|
TaskService.add_event_to_process_instance(
|
||||||
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
|
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
|
||||||
)
|
)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def add_event_to_process_instance(
|
|
||||||
cls,
|
|
||||||
process_instance: ProcessInstanceModel,
|
|
||||||
event_type: str,
|
|
||||||
task_guid: Optional[str] = None,
|
|
||||||
user_id: Optional[int] = None,
|
|
||||||
) -> None:
|
|
||||||
if user_id is None and hasattr(g, "user") and g.user:
|
|
||||||
user_id = g.user.id
|
|
||||||
process_instance_event = ProcessInstanceEventModel(
|
|
||||||
process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id
|
|
||||||
)
|
|
||||||
if task_guid:
|
|
||||||
process_instance_event.task_guid = task_guid
|
|
||||||
db.session.add(process_instance_event)
|
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import contextlib
|
import contextlib
|
||||||
import time
|
import time
|
||||||
|
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||||
|
from spiffworkflow_backend.services.task_service import TaskService
|
||||||
from typing import Generator
|
from typing import Generator
|
||||||
from typing import List
|
from typing import List
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
@ -24,8 +26,6 @@ class ProcessInstanceIsAlreadyLockedError(Exception):
|
||||||
|
|
||||||
|
|
||||||
class ProcessInstanceQueueService:
|
class ProcessInstanceQueueService:
|
||||||
"""TODO: comment."""
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _configure_and_save_queue_entry(
|
def _configure_and_save_queue_entry(
|
||||||
cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel
|
cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel
|
||||||
|
@ -99,6 +99,7 @@ class ProcessInstanceQueueService:
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
process_instance.status = ProcessInstanceStatus.error.value
|
process_instance.status = ProcessInstanceStatus.error.value
|
||||||
db.session.add(process_instance)
|
db.session.add(process_instance)
|
||||||
|
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
raise ex
|
raise ex
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -322,19 +322,20 @@ class ProcessInstanceService:
|
||||||
|
|
||||||
cls.replace_file_data_with_digest_references(data, models)
|
cls.replace_file_data_with_digest_references(data, models)
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def update_form_task_data(
|
def update_form_task_data(
|
||||||
processor: ProcessInstanceProcessor,
|
cls,
|
||||||
|
process_instance: ProcessInstanceModel,
|
||||||
spiff_task: SpiffTask,
|
spiff_task: SpiffTask,
|
||||||
data: dict[str, Any],
|
data: dict[str, Any],
|
||||||
user: UserModel,
|
user: UserModel,
|
||||||
) -> None:
|
) -> None:
|
||||||
AuthorizationService.assert_user_can_complete_spiff_task(processor.process_instance_model.id, spiff_task, user)
|
AuthorizationService.assert_user_can_complete_spiff_task(process_instance.id, spiff_task, user)
|
||||||
ProcessInstanceService.save_file_data_and_replace_with_digest_references(
|
cls.save_file_data_and_replace_with_digest_references(
|
||||||
data,
|
data,
|
||||||
processor.process_instance_model.id,
|
process_instance.id,
|
||||||
)
|
)
|
||||||
dot_dct = ProcessInstanceService.create_dot_dict(data)
|
dot_dct = cls.create_dot_dict(data)
|
||||||
spiff_task.update_data(dot_dct)
|
spiff_task.update_data(dot_dct)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -350,7 +351,7 @@ class ProcessInstanceService:
|
||||||
Abstracted here because we need to do it multiple times when completing all tasks in
|
Abstracted here because we need to do it multiple times when completing all tasks in
|
||||||
a multi-instance task.
|
a multi-instance task.
|
||||||
"""
|
"""
|
||||||
ProcessInstanceService.update_form_task_data(processor, spiff_task, data, user)
|
ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user)
|
||||||
# ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store.
|
# ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store.
|
||||||
processor.complete_task(spiff_task, human_task, user=user)
|
processor.complete_task(spiff_task, human_task, user=user)
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
import copy
|
import copy
|
||||||
import json
|
import json
|
||||||
|
from flask import g
|
||||||
|
from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel
|
||||||
|
import traceback
|
||||||
import time
|
import time
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
@ -592,3 +595,34 @@ class TaskService:
|
||||||
for json_data_dict in json_data_dict_list:
|
for json_data_dict in json_data_dict_list:
|
||||||
if json_data_dict is not None:
|
if json_data_dict is not None:
|
||||||
json_data_dicts[json_data_dict["hash"]] = json_data_dict
|
json_data_dicts[json_data_dict["hash"]] = json_data_dict
|
||||||
|
|
||||||
|
# TODO: move to process_instance_service once we clean it and the processor up
|
||||||
|
@classmethod
|
||||||
|
def add_event_to_process_instance(
|
||||||
|
cls,
|
||||||
|
process_instance: ProcessInstanceModel,
|
||||||
|
event_type: str,
|
||||||
|
task_guid: Optional[str] = None,
|
||||||
|
user_id: Optional[int] = None,
|
||||||
|
exception: Optional[Exception] = None,
|
||||||
|
) -> None:
|
||||||
|
if user_id is None and hasattr(g, "user") and g.user:
|
||||||
|
user_id = g.user.id
|
||||||
|
process_instance_event = ProcessInstanceEventModel(
|
||||||
|
process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id
|
||||||
|
)
|
||||||
|
if task_guid:
|
||||||
|
process_instance_event.task_guid = task_guid
|
||||||
|
db.session.add(process_instance_event)
|
||||||
|
|
||||||
|
if event_type == ProcessInstanceEventType.process_instance_error.value and exception is not None:
|
||||||
|
# truncate to avoid database errors on large values. We observed that text in mysql is 65K.
|
||||||
|
stacktrace = traceback.format_exc()[0:63999]
|
||||||
|
message = str(exception)[0:1023]
|
||||||
|
|
||||||
|
process_instance_error_detail = ProcessInstanceErrorDetailModel(
|
||||||
|
process_instance_event=process_instance_event,
|
||||||
|
message=message,
|
||||||
|
stacktrace=stacktrace,
|
||||||
|
)
|
||||||
|
db.session.add(process_instance_error_detail)
|
||||||
|
|
Loading…
Reference in New Issue