Merging with main.

This commit is contained in:
Dan 2023-04-20 16:06:22 -04:00
commit a6adb98a19
39 changed files with 3295 additions and 2716 deletions

View File

@ -87,7 +87,7 @@ jobs:
uses: actions/checkout@v3.3.0
- name: Set up Python ${{ matrix.python }}
uses: actions/setup-python@v4.2.0
uses: actions/setup-python@v4.6.0
with:
python-version: ${{ matrix.python }}
@ -195,7 +195,7 @@ jobs:
- name: Check out the repository
uses: actions/checkout@v3.3.0
- name: Set up Python
uses: actions/setup-python@v4.2.0
uses: actions/setup-python@v4.6.0
with:
python-version: "3.11"
- name: Install Poetry
@ -236,7 +236,7 @@ jobs:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v4.2.0
uses: actions/setup-python@v4.6.0
with:
python-version: "3.11"

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 44a8f46cc508
Revision ID: 0c7428378d6e
Revises:
Create Date: 2023-04-17 15:40:28.658588
Create Date: 2023-04-20 14:05:44.779453
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '44a8f46cc508'
revision = '0c7428378d6e'
down_revision = None
branch_labels = None
depends_on = None
@ -84,6 +84,15 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uri')
)
op.create_table('process_caller_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_identifier', sa.String(length=255), nullable=True),
sa.Column('calling_process_identifier', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table('process_caller_cache', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_process_caller_cache_process_identifier'), ['process_identifier'], unique=False)
op.create_table('spec_reference_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('identifier', sa.String(length=255), nullable=True),
@ -463,6 +472,21 @@ def upgrade():
batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'), ['message_instance_id'], unique=False)
batch_op.create_index(batch_op.f('ix_message_instance_correlation_rule_name'), ['name'], unique=False)
op.create_table('process_instance_error_detail',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_event_id', sa.Integer(), nullable=False),
sa.Column('message', sa.String(length=1024), nullable=False),
sa.Column('stacktrace', sa.JSON(), nullable=False),
sa.Column('task_line_number', sa.Integer(), nullable=True),
sa.Column('task_offset', sa.Integer(), nullable=True),
sa.Column('task_line_contents', sa.String(length=255), nullable=True),
sa.Column('task_trace', sa.JSON(), nullable=True),
sa.ForeignKeyConstraint(['process_instance_event_id'], ['process_instance_event.id'], ),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id'), ['process_instance_event_id'], unique=False)
op.create_table('human_task_user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('human_task_id', sa.Integer(), nullable=False),
@ -486,6 +510,10 @@ def downgrade():
batch_op.drop_index(batch_op.f('ix_human_task_user_human_task_id'))
op.drop_table('human_task_user')
with op.batch_alter_table('process_instance_error_detail', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_process_instance_error_detail_process_instance_event_id'))
op.drop_table('process_instance_error_detail')
with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_name'))
batch_op.drop_index(batch_op.f('ix_message_instance_correlation_rule_message_instance_id'))
@ -607,6 +635,10 @@ def downgrade():
batch_op.drop_index(batch_op.f('ix_spec_reference_cache_display_name'))
op.drop_table('spec_reference_cache')
with op.batch_alter_table('process_caller_cache', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_process_caller_cache_process_identifier'))
op.drop_table('process_caller_cache')
op.drop_table('permission_target')
with op.batch_alter_table('message_triggerable_process_model', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_message_triggerable_process_model_process_model_identifier'))

File diff suppressed because it is too large Load Diff

View File

@ -519,6 +519,31 @@ paths:
schema:
type: string
/processes/{bpmn_process_identifier}/callers:
parameters:
- name: bpmn_process_identifier
in: path
required: true
description: the modified process model id
schema:
type: string
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_caller_lists
summary:
Return a list of information about all processes that call the provided process id
tags:
- Process Models
responses:
"200":
description: Successfully return the requested calling processes
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/Process"
/processes:
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_list
@ -2031,6 +2056,39 @@ paths:
schema:
$ref: "#/components/schemas/ProcessInstanceLog"
/event-error-details/{modified_process_model_identifier}/{process_instance_id}/{process_instance_event_id}:
parameters:
- name: process_instance_id
in: path
required: true
description: the id of the process instance
schema:
type: integer
- name: modified_process_model_identifier
in: path
required: true
description: The process_model_id, modified to replace slashes (/)
schema:
type: string
- name: process_instance_event_id
in: path
required: true
description: the id of the process instance event
schema:
type: integer
get:
tags:
- Process Instance Events
operationId: spiffworkflow_backend.routes.process_instance_events_controller.error_details
summary: returns the error details for a given process instance event.
responses:
"200":
description: list of types
content:
application/json:
schema:
$ref: "#/components/schemas/ProcessInstanceLog"
/secrets:
parameters:
- name: page

View File

@ -21,6 +21,9 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel # noqa: F401
from spiffworkflow_backend.models.spec_reference import (
SpecReferenceCache,
) # noqa: F401
from spiffworkflow_backend.models.process_caller import (
ProcessCallerCacheModel,
) # noqa: F401
from spiffworkflow_backend.models.message_instance import (
MessageInstanceModel,
) # noqa: F401

View File

@ -0,0 +1,12 @@
"""ProcessCaller_model."""
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
class ProcessCallerCacheModel(SpiffworkflowBaseDBModel):
"""A cache of calling process ids for all Processes defined in all files."""
__tablename__ = "process_caller_cache"
id = db.Column(db.Integer, primary_key=True)
process_identifier = db.Column(db.String(255), index=True)
calling_process_identifier = db.Column(db.String(255))

View File

@ -229,7 +229,6 @@ class ProcessInstanceApiSchema(Schema):
"next_task",
"process_model_identifier",
"process_model_display_name",
"completed_tasks",
"updated_at_in_seconds",
]
unknown = INCLUDE
@ -246,7 +245,6 @@ class ProcessInstanceApiSchema(Schema):
"next_task",
"process_model_identifier",
"process_model_display_name",
"completed_tasks",
"updated_at_in_seconds",
]
filtered_fields = {key: data[key] for key in keys}

View File

@ -0,0 +1,27 @@
from dataclasses import dataclass
from typing import Optional
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@dataclass
class ProcessInstanceErrorDetailModel(SpiffworkflowBaseDBModel):
__tablename__ = "process_instance_error_detail"
id: int = db.Column(db.Integer, primary_key=True)
process_instance_event_id: int = db.Column(ForeignKey("process_instance_event.id"), nullable=False, index=True)
process_instance_event = relationship("ProcessInstanceEventModel") # type: ignore
message: str = db.Column(db.String(1024), nullable=False)
# this should be 65k in mysql
stacktrace: Optional[list] = db.Column(db.JSON, nullable=False)
task_line_number: Optional[int] = db.Column(db.Integer)
task_offset: Optional[int] = db.Column(db.Integer)
task_line_contents: Optional[str] = db.Column(db.String(255))
task_trace: Optional[list] = db.Column(db.JSON)

View File

@ -3,6 +3,7 @@ from __future__ import annotations
from typing import Any
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates
from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum
@ -13,6 +14,7 @@ from spiffworkflow_backend.models.user import UserModel
# event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same.
class ProcessInstanceEventType(SpiffEnum):
process_instance_error = "process_instance_error"
process_instance_resumed = "process_instance_resumed"
process_instance_rewound_to_task = "process_instance_rewound_to_task"
process_instance_suspended = "process_instance_suspended"
@ -37,6 +39,10 @@ class ProcessInstanceEventModel(SpiffworkflowBaseDBModel):
user_id = db.Column(ForeignKey(UserModel.id), nullable=True, index=True) # type: ignore
error_details = relationship(
"ProcessInstanceErrorDetailModel", back_populates="process_instance_event", cascade="delete"
) # type: ignore
@validates("event_type")
def validate_event_type(self, key: str, value: Any) -> Any:
return self.validate_enum_field(key, value, ProcessInstanceEventType)

View File

@ -1,4 +1,3 @@
"""Process_instance_metadata."""
from dataclasses import dataclass
from sqlalchemy import ForeignKey
@ -10,8 +9,6 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@dataclass
class ProcessInstanceMetadataModel(SpiffworkflowBaseDBModel):
"""ProcessInstanceMetadataModel."""
__tablename__ = "process_instance_metadata"
__table_args__ = (db.UniqueConstraint("process_instance_id", "key", name="process_instance_metadata_unique"),)

View File

@ -35,6 +35,7 @@ class SpecReference:
messages: dict # Any messages defined in the same file where this process is defined.
correlations: dict # Any correlations defined in the same file with this process.
start_messages: list # The names of any messages that would start this process.
called_element_ids: list # The element ids of any called elements
class SpecReferenceCache(SpiffworkflowBaseDBModel):

View File

@ -28,6 +28,7 @@ from spiffworkflow_backend.models.spec_reference import SpecReferenceSchema
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.git_service import GitService
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -77,6 +78,14 @@ def process_list() -> Any:
return SpecReferenceSchema(many=True).dump(references)
def process_caller_lists(bpmn_process_identifier: str) -> Any:
callers = ProcessCallerService.callers(bpmn_process_identifier)
references = (
SpecReferenceCache.query.filter_by(type="process").filter(SpecReferenceCache.identifier.in_(callers)).all()
)
return SpecReferenceSchema(many=True).dump(references)
def _process_data_fetcher(
process_instance_id: int,
process_data_identifier: str,

View File

@ -5,6 +5,7 @@ from flask import jsonify
from flask import make_response
from sqlalchemy import and_
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
@ -91,3 +92,20 @@ def types() -> flask.wrappers.Response:
task_types = [t.typename for t in query]
event_types = ProcessInstanceEventType.list()
return make_response(jsonify({"task_types": task_types, "event_types": event_types}), 200)
def error_details(
modified_process_model_identifier: str,
process_instance_id: int,
process_instance_event_id: int,
) -> flask.wrappers.Response:
process_instance_event = ProcessInstanceEventModel.query.filter_by(id=process_instance_event_id).first()
if process_instance_event is None:
raise (
ApiError(
error_code="process_instance_event_cannot_be_found",
message=f"Process instance event cannot be found: {process_instance_event_id}",
status_code=400,
)
)
return make_response(jsonify(process_instance_event.error_details[0]), 200)

View File

@ -112,7 +112,6 @@ def process_instance_create(
def process_instance_run(
modified_process_model_identifier: str,
process_instance_id: int,
do_engine_steps: bool = True,
) -> flask.wrappers.Response:
"""Process_instance_run."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
@ -123,22 +122,21 @@ def process_instance_run(
status_code=400,
)
processor = ProcessInstanceProcessor(process_instance)
if do_engine_steps:
try:
processor.do_engine_steps(save=True)
except (
ApiError,
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e)
raise e
except Exception as e:
ErrorHandlingService().handle_error(processor, e)
# FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes.
# we need to recurse through all last tasks if the last task is a call activity or subprocess.
processor = None
try:
processor = ProcessInstanceService.run_process_intance_with_processor(process_instance)
except (
ApiError,
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService.handle_error(process_instance, e)
raise e
except Exception as e:
ErrorHandlingService.handle_error(process_instance, e)
# FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes.
# we need to recurse through all last tasks if the last task is a call activity or subprocess.
if processor is not None:
task = processor.bpmn_process_instance.last_task
raise ApiError.from_task(
error_code="unknown_exception",
@ -146,15 +144,22 @@ def process_instance_run(
status_code=400,
task=task,
) from e
raise e
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances()
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances()
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
process_instance_data = processor.get_data()
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
process_instance_metadata["data"] = process_instance_data
return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json")
# for mypy
if processor is not None:
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
process_instance_data = processor.get_data()
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
process_instance_metadata["data"] = process_instance_data
return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json")
# FIXME: this should never happen currently but it'd be ideal to always do this
# currently though it does not return next task so it cannnot be used to take the user to the next human task
return make_response(jsonify(process_instance), 200)
def process_instance_terminate(
@ -172,7 +177,7 @@ def process_instance_terminate(
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e)
ErrorHandlingService.handle_error(process_instance, e)
raise e
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
@ -193,7 +198,7 @@ def process_instance_suspend(
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e)
ErrorHandlingService.handle_error(process_instance, e)
raise e
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
@ -214,7 +219,7 @@ def process_instance_resume(
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e)
ErrorHandlingService.handle_error(process_instance, e)
raise e
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")

View File

@ -217,7 +217,7 @@ def task_data_update(
)
if json_data_dict is not None:
TaskService.insert_or_update_json_data_records({json_data_dict["hash"]: json_data_dict})
ProcessInstanceProcessor.add_event_to_process_instance(
TaskService.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.task_data_edited.value, task_guid=task_guid
)
try:
@ -469,7 +469,7 @@ def _task_submit_shared(
if save_as_draft:
task_model = _get_task_model_from_guid_or_raise(task_guid, process_instance_id)
ProcessInstanceService.update_form_task_data(processor, spiff_task, body, g.user)
ProcessInstanceService.update_form_task_data(process_instance, spiff_task, body, g.user)
json_data_dict = TaskService.update_task_data_on_task_model_and_return_dict_if_updated(
task_model, spiff_task.data, "json_data_hash"
)

View File

@ -1,66 +1,58 @@
"""Error_handling_service."""
from typing import Union
from flask import current_app
from flask import g
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
class ErrorHandlingService:
"""ErrorHandlingService."""
MESSAGE_NAME = "SystemErrorMessage"
@staticmethod
def set_instance_status(instance_id: int, status: str) -> None:
"""Set_instance_status."""
instance = db.session.query(ProcessInstanceModel).filter(ProcessInstanceModel.id == instance_id).first()
if instance:
instance.status = status
db.session.commit()
def handle_error(self, _processor: ProcessInstanceProcessor, _error: Union[ApiError, Exception]) -> None:
@classmethod
def handle_error(cls, process_instance: ProcessInstanceModel, error: Exception) -> None:
"""On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception."""
process_model = ProcessModelService.get_process_model(_processor.process_model_identifier)
# First, suspend or fault the instance
if process_model.fault_or_suspend_on_exception == "suspend":
self.set_instance_status(
_processor.process_instance_model.id,
ProcessInstanceStatus.suspended.value,
)
else:
# fault is the default
self.set_instance_status(
_processor.process_instance_model.id,
ProcessInstanceStatus.error.value,
)
process_model = ProcessModelService.get_process_model(process_instance.process_model_identifier)
cls._update_process_instance_in_database(process_instance, process_model.fault_or_suspend_on_exception)
# Second, send a bpmn message out, but only if an exception notification address is provided
# This will create a new Send Message with correlation keys on the recipients and the message
# body.
if len(process_model.exception_notification_addresses) > 0:
try:
self.handle_system_notification(_error, process_model, _processor)
cls._handle_system_notification(error, process_model, process_instance)
except Exception as e:
# hmm... what to do if a notification method fails. Probably log, at least
current_app.logger.error(e)
@classmethod
def _update_process_instance_in_database(
cls, process_instance: ProcessInstanceModel, fault_or_suspend_on_exception: str
) -> None:
# First, suspend or fault the instance
if fault_or_suspend_on_exception == "suspend":
cls._set_instance_status(
process_instance,
ProcessInstanceStatus.suspended.value,
)
else:
# fault is the default
cls._set_instance_status(
process_instance,
ProcessInstanceStatus.error.value,
)
db.session.commit()
@staticmethod
def handle_system_notification(
error: Union[ApiError, Exception],
def _handle_system_notification(
error: Exception,
process_model: ProcessModelInfo,
_processor: ProcessInstanceProcessor,
process_instance: ProcessInstanceModel,
) -> None:
"""Send a BPMN Message - which may kick off a waiting process."""
message_text = (
@ -74,7 +66,7 @@ class ErrorHandlingService:
if "user" in g:
user_id = g.user.id
else:
user_id = _processor.process_instance_model.process_initiator_id
user_id = process_instance.process_initiator_id
message_instance = MessageInstanceModel(
message_type="send",
@ -85,3 +77,8 @@ class ErrorHandlingService:
db.session.add(message_instance)
db.session.commit()
MessageService.correlate_send_message(message_instance)
@staticmethod
def _set_instance_status(process_instance: ProcessInstanceModel, status: str) -> None:
process_instance.status = status
db.session.add(process_instance)

View File

@ -0,0 +1,42 @@
from typing import List
from sqlalchemy import or_
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_caller import ProcessCallerCacheModel
class ProcessCallerService:
@staticmethod
def count() -> int:
return ProcessCallerCacheModel.query.count() # type: ignore
@staticmethod
def clear_cache() -> None:
db.session.query(ProcessCallerCacheModel).delete()
@staticmethod
def clear_cache_for_process_ids(process_ids: List[str]) -> None:
db.session.query(ProcessCallerCacheModel).filter(
or_(
ProcessCallerCacheModel.process_identifier.in_(process_ids),
ProcessCallerCacheModel.calling_process_identifier.in_(process_ids),
)
).delete()
@staticmethod
def add_caller(process_id: str, called_process_ids: List[str]) -> None:
for called_process_id in called_process_ids:
db.session.add(
ProcessCallerCacheModel(process_identifier=called_process_id, calling_process_identifier=process_id)
)
db.session.commit()
@staticmethod
def callers(process_id: str) -> List[str]:
records = (
db.session.query(ProcessCallerCacheModel)
.filter(ProcessCallerCacheModel.process_identifier == process_id)
.all()
)
return list(set(map(lambda r: r.calling_process_identifier, records))) # type: ignore

View File

@ -1,4 +1,6 @@
"""Process_instance_processor."""
# TODO: clean up this service for a clear distinction between it and the process_instance_service
# where this points to the pi service
import _strptime # type: ignore
import copy
import decimal
@ -24,7 +26,6 @@ from uuid import UUID
import dateparser
import pytz
from flask import current_app
from flask import g
from lxml import etree # type: ignore
from lxml.etree import XMLSyntaxError # type: ignore
from RestrictedPython import safe_globals # type: ignore
@ -75,7 +76,6 @@ from spiffworkflow_backend.models.message_instance_correlation import (
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.process_instance_metadata import (
ProcessInstanceMetadataModel,
@ -102,9 +102,8 @@ from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.task_service import JsonDataDict
from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.services.user_service import UserService
from spiffworkflow_backend.services.workflow_execution_service import (
execution_strategy_named,
)
from spiffworkflow_backend.services.workflow_execution_service import execution_strategy_named
from spiffworkflow_backend.services.workflow_execution_service import ExecutionStrategyNotConfiguredError
from spiffworkflow_backend.services.workflow_execution_service import (
TaskModelSavingDelegate,
)
@ -157,9 +156,10 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty
script: str,
context: Dict[str, Any],
external_methods: Optional[Dict[str, Any]] = None,
) -> None:
) -> bool:
super().execute(script, context, external_methods)
self._last_result = context
return True
def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return {}
@ -212,7 +212,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
script: str,
context: Dict[str, Any],
external_methods: Optional[Dict[str, Any]] = None,
) -> None:
) -> bool:
# TODO: once integrated look at the tests that fail without Box
# context is task.data
Box.convert_to_box(context)
@ -221,6 +221,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
self.state.update(context)
try:
exec(script, self.state) # noqa
return True
finally:
# since the task data is not directly mutated when the script executes, need to determine which keys
# have been deleted from the environment and remove them from task data if present.
@ -313,13 +314,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
# This will overwrite the standard builtins
default_globals.update(safe_globals)
default_globals["__builtins__"]["__import__"] = _import
environment = CustomScriptEngineEnvironment(default_globals)
# right now spiff is executing script tasks on ready so doing this
# so we know when something fails and we can save it to our database.
self.failing_spiff_task: Optional[SpiffTask] = None
super().__init__(environment=environment)
def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]:
@ -346,7 +341,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
expression: str,
external_methods: Optional[dict[str, Any]] = None,
) -> Any:
"""Evaluate."""
return self._evaluate(expression, task.data, task, external_methods)
def _evaluate(
@ -356,7 +350,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
task: Optional[SpiffTask] = None,
external_methods: Optional[Dict[str, Any]] = None,
) -> Any:
"""_evaluate."""
methods = self.__get_augment_methods(task)
if external_methods:
methods.update(external_methods)
@ -376,17 +369,15 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
exception=exception,
) from exception
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None:
"""Execute."""
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> bool:
try:
# reset failing task just in case
self.failing_spiff_task = None
methods = self.__get_augment_methods(task)
if external_methods:
methods.update(external_methods)
super().execute(task, script, methods)
return True
except WorkflowException as e:
self.failing_spiff_task = task
raise e
except Exception as e:
raise self.create_task_exec_exception(task, script, e) from e
@ -397,7 +388,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
operation_params: Dict[str, Any],
task_data: Dict[str, Any],
) -> Any:
"""CallService."""
return ServiceTaskDelegate.call_connector(operation_name, operation_params, task_data)
@ -1119,14 +1109,10 @@ class ProcessInstanceProcessor:
"""Saves the current state of this processor to the database."""
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
user_tasks = list(self.get_all_user_tasks())
self.process_instance_model.status = self.get_status().value
current_app.logger.debug(
f"the_status: {self.process_instance_model.status} for instance {self.process_instance_model.id}"
)
self.process_instance_model.total_tasks = len(user_tasks)
self.process_instance_model.completed_tasks = sum(1 for t in user_tasks if t.state in complete_states)
if self.process_instance_model.start_in_seconds is None:
self.process_instance_model.start_in_seconds = round(time.time())
@ -1318,7 +1304,7 @@ class ProcessInstanceProcessor:
db.session.bulk_save_objects(new_task_models.values())
TaskService.insert_or_update_json_data_records(new_json_data_dicts)
self.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id)
TaskService.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id)
self.save()
# Saving the workflow seems to reset the status
self.suspend()
@ -1331,7 +1317,7 @@ class ProcessInstanceProcessor:
def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None:
"""Reset a process to an earlier state."""
# raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
cls.add_event_to_process_instance(
TaskService.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
)
@ -1688,6 +1674,10 @@ class ProcessInstanceProcessor:
if execution_strategy_name is None:
execution_strategy_name = current_app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"]
if execution_strategy_name is None:
raise ExecutionStrategyNotConfiguredError(
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set"
)
execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate)
execution_service = WorkflowExecutionService(
@ -1697,16 +1687,7 @@ class ProcessInstanceProcessor:
self._script_engine.environment.finalize_result,
self.save,
)
try:
execution_service.run_and_save(exit_at, save)
finally:
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
# script engine on a class variable.
if (
hasattr(self._script_engine, "failing_spiff_task")
and self._script_engine.failing_spiff_task is not None
):
self._script_engine.failing_spiff_task = None
execution_service.run_and_save(exit_at, save)
@classmethod
def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
@ -1861,7 +1842,7 @@ class ProcessInstanceProcessor:
TaskService.update_json_data_dicts_using_list(json_data_dict_list, json_data_dict_mapping)
TaskService.insert_or_update_json_data_records(json_data_dict_mapping)
self.add_event_to_process_instance(
TaskService.add_event_to_process_instance(
self.process_instance_model,
ProcessInstanceEventType.task_completed.value,
task_guid=task_model.guid,
@ -1927,7 +1908,6 @@ class ProcessInstanceProcessor:
return [t for t in all_tasks if not self.bpmn_process_instance._is_engine_task(t.task_spec)]
def get_all_completed_tasks(self) -> list[SpiffTask]:
"""Get_all_completed_tasks."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [
t
@ -1960,49 +1940,13 @@ class ProcessInstanceProcessor:
return task
return None
def get_nav_item(self, task: SpiffTask) -> Any:
"""Get_nav_item."""
for nav_item in self.bpmn_process_instance.get_nav_list():
if nav_item["task_id"] == task.id:
return nav_item
return None
def find_spec_and_field(self, spec_name: str, field_id: Union[str, int]) -> Any:
"""Tracks down a form field by name in the process_instance spec(s), Returns a tuple of the task, and form."""
process_instances = [self.bpmn_process_instance]
for task in self.bpmn_process_instance.get_ready_user_tasks():
if task.process_instance not in process_instances:
process_instances.append(task.process_instance)
for process_instance in process_instances:
for spec in process_instance.spec.task_specs.values():
if spec.name == spec_name:
if not hasattr(spec, "form"):
raise ApiError(
"invalid_spec",
"The spec name you provided does not contain a form.",
)
for field in spec.form.fields:
if field.id == field_id:
return spec, field
raise ApiError(
"invalid_field",
f"The task '{spec_name}' has no field named '{field_id}'",
)
raise ApiError(
"invalid_spec",
f"Unable to find a task in the process_instance called '{spec_name}'",
)
def terminate(self) -> None:
"""Terminate."""
self.bpmn_process_instance.cancel()
self.save()
self.process_instance_model.status = "terminated"
db.session.add(self.process_instance_model)
self.add_event_to_process_instance(
TaskService.add_event_to_process_instance(
self.process_instance_model, ProcessInstanceEventType.process_instance_terminated.value
)
db.session.commit()
@ -2011,7 +1955,7 @@ class ProcessInstanceProcessor:
"""Suspend."""
self.process_instance_model.status = ProcessInstanceStatus.suspended.value
db.session.add(self.process_instance_model)
self.add_event_to_process_instance(
TaskService.add_event_to_process_instance(
self.process_instance_model, ProcessInstanceEventType.process_instance_suspended.value
)
db.session.commit()
@ -2020,24 +1964,7 @@ class ProcessInstanceProcessor:
"""Resume."""
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
db.session.add(self.process_instance_model)
self.add_event_to_process_instance(
TaskService.add_event_to_process_instance(
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
)
db.session.commit()
@classmethod
def add_event_to_process_instance(
cls,
process_instance: ProcessInstanceModel,
event_type: str,
task_guid: Optional[str] = None,
user_id: Optional[int] = None,
) -> None:
if user_id is None and hasattr(g, "user") and g.user:
user_id = g.user.id
process_instance_event = ProcessInstanceEventModel(
process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id
)
if task_guid:
process_instance_event.task_guid = task_guid
db.session.add(process_instance_event)

View File

@ -7,12 +7,15 @@ from typing import Optional
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.process_instance_queue import (
ProcessInstanceQueueModel,
)
from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService,
)
from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
class ProcessInstanceIsNotEnqueuedError(Exception):
@ -24,8 +27,6 @@ class ProcessInstanceIsAlreadyLockedError(Exception):
class ProcessInstanceQueueService:
"""TODO: comment."""
@classmethod
def _configure_and_save_queue_entry(
cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel
@ -99,6 +100,12 @@ class ProcessInstanceQueueService:
except Exception as ex:
process_instance.status = ProcessInstanceStatus.error.value
db.session.add(process_instance)
# these events are handled in the WorkflowExecutionService.
# that is, we don't need to add error_detail records here, etc.
if not isinstance(ex, WorkflowExecutionServiceError):
TaskService.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex
)
db.session.commit()
raise ex
finally:

View File

@ -116,20 +116,9 @@ class ProcessInstanceService:
.all()
)
for process_instance in records:
current_app.logger.info(f"Processing process_instance {process_instance.id}")
try:
current_app.logger.info(f"Processing process_instance {process_instance.id}")
with ProcessInstanceQueueService.dequeued(process_instance):
processor = ProcessInstanceProcessor(process_instance)
if cls.can_optimistically_skip(processor, status_value):
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
continue
db.session.refresh(process_instance)
if process_instance.status == status_value:
execution_strategy_name = current_app.config[
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"
]
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
cls.run_process_intance_with_processor(process_instance, status_value=status_value)
except ProcessInstanceIsAlreadyLockedError:
continue
except Exception as e:
@ -140,6 +129,26 @@ class ProcessInstanceService:
)
current_app.logger.error(error_message)
@classmethod
def run_process_intance_with_processor(
cls, process_instance: ProcessInstanceModel, status_value: Optional[str] = None
) -> Optional[ProcessInstanceProcessor]:
processor = None
with ProcessInstanceQueueService.dequeued(process_instance):
processor = ProcessInstanceProcessor(process_instance)
if status_value and cls.can_optimistically_skip(processor, status_value):
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
return None
db.session.refresh(process_instance)
if status_value is None or process_instance.status == status_value:
execution_strategy_name = current_app.config[
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"
]
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
return processor
@staticmethod
def processor_to_process_instance_api(
processor: ProcessInstanceProcessor, next_task: None = None
@ -324,19 +333,20 @@ class ProcessInstanceService:
cls.replace_file_data_with_digest_references(data, models)
@staticmethod
@classmethod
def update_form_task_data(
processor: ProcessInstanceProcessor,
cls,
process_instance: ProcessInstanceModel,
spiff_task: SpiffTask,
data: dict[str, Any],
user: UserModel,
) -> None:
AuthorizationService.assert_user_can_complete_spiff_task(processor.process_instance_model.id, spiff_task, user)
ProcessInstanceService.save_file_data_and_replace_with_digest_references(
AuthorizationService.assert_user_can_complete_spiff_task(process_instance.id, spiff_task, user)
cls.save_file_data_and_replace_with_digest_references(
data,
processor.process_instance_model.id,
process_instance.id,
)
dot_dct = ProcessInstanceService.create_dot_dict(data)
dot_dct = cls.create_dot_dict(data)
spiff_task.update_data(dot_dct)
@staticmethod
@ -352,7 +362,7 @@ class ProcessInstanceService:
Abstracted here because we need to do it multiple times when completing all tasks in
a multi-instance task.
"""
ProcessInstanceService.update_form_task_data(processor, spiff_task, data, user)
ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user)
# ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store.
processor.complete_task(spiff_task, human_task, user=user)

View File

@ -22,6 +22,7 @@ from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.services.custom_parser import MyCustomParser
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -112,6 +113,7 @@ class SpecFileService(FileSystemService):
messages = {}
correlations = {}
start_messages = []
called_element_ids = []
if file_type.value == FileType.bpmn.value:
parser.add_bpmn_xml(cls.get_etree_from_xml_bytes(binary_data))
parser_type = "process"
@ -130,6 +132,7 @@ class SpecFileService(FileSystemService):
is_executable = sub_parser.process_executable
start_messages = sub_parser.start_messages()
is_primary = sub_parser.get_id() == process_model_info.primary_process_id
called_element_ids = sub_parser.called_element_ids()
references.append(
SpecReference(
@ -145,6 +148,7 @@ class SpecFileService(FileSystemService):
is_primary=is_primary,
correlations=correlations,
start_messages=start_messages,
called_element_ids=called_element_ids,
)
)
return references
@ -258,6 +262,7 @@ class SpecFileService(FileSystemService):
def update_caches(ref: SpecReference) -> None:
"""Update_caches."""
SpecFileService.update_process_cache(ref)
SpecFileService.update_process_caller_cache(ref)
SpecFileService.update_message_cache(ref)
SpecFileService.update_message_trigger_cache(ref)
SpecFileService.update_correlation_cache(ref)
@ -265,15 +270,27 @@ class SpecFileService(FileSystemService):
@staticmethod
def clear_caches_for_file(file_name: str, process_model_info: ProcessModelInfo) -> None:
"""Clear all caches related to a file."""
db.session.query(SpecReferenceCache).filter(SpecReferenceCache.file_name == file_name).filter(
SpecReferenceCache.process_model_id == process_model_info.id
).delete()
records = (
db.session.query(SpecReferenceCache)
.filter(SpecReferenceCache.file_name == file_name)
.filter(SpecReferenceCache.process_model_id == process_model_info.id)
.all()
)
process_ids = []
for record in records:
process_ids.append(record.identifier)
db.session.delete(record)
ProcessCallerService.clear_cache_for_process_ids(process_ids)
# fixme: likely the other caches should be cleared as well, but we don't have a clean way to do so yet.
@staticmethod
def clear_caches() -> None:
"""Clear_caches."""
db.session.query(SpecReferenceCache).delete()
ProcessCallerService.clear_cache()
# fixme: likely the other caches should be cleared as well, but we don't have a clean way to do so yet.
@staticmethod
@ -301,6 +318,10 @@ class SpecFileService(FileSystemService):
db.session.add(process_id_lookup)
db.session.commit()
@staticmethod
def update_process_caller_cache(ref: SpecReference) -> None:
ProcessCallerService.add_caller(ref.identifier, ref.called_element_ids)
@staticmethod
def update_message_cache(ref: SpecReference) -> None:
"""Assure we have a record in the database of all possible message ids and names."""

View File

@ -1,6 +1,7 @@
import copy
import json
import time
import traceback
from hashlib import sha256
from typing import Optional
from typing import Tuple
@ -8,19 +9,23 @@ from typing import TypedDict
from uuid import UUID
from flask import current_app
from flask import g
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer
from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.task import TaskStateNames
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as postgres_insert
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process import BpmnProcessNotFoundError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
@ -112,7 +117,6 @@ class TaskService:
def update_task_model_with_spiff_task(
self,
spiff_task: SpiffTask,
task_failed: bool = False,
start_and_end_times: Optional[StartAndEndTimes] = None,
) -> TaskModel:
new_bpmn_process = None
@ -153,19 +157,16 @@ class TaskService:
task_model.start_in_seconds = start_and_end_times["start_in_seconds"]
task_model.end_in_seconds = start_and_end_times["end_in_seconds"]
if task_model.state == "COMPLETED" or task_failed:
# let failed tasks raise and we will log the event then
if task_model.state == "COMPLETED":
event_type = ProcessInstanceEventType.task_completed.value
if task_failed:
event_type = ProcessInstanceEventType.task_failed.value
# FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete
# which script tasks execute when READY.
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
process_instance_event = ProcessInstanceEventModel(
process_instance_event, _process_instance_error_detail = TaskService.add_event_to_process_instance(
self.process_instance,
event_type,
task_guid=task_model.guid,
process_instance_id=self.process_instance.id,
event_type=event_type,
timestamp=timestamp,
add_to_db_session=False,
)
self.process_instance_events[task_model.guid] = process_instance_event
@ -592,3 +593,61 @@ class TaskService:
for json_data_dict in json_data_dict_list:
if json_data_dict is not None:
json_data_dicts[json_data_dict["hash"]] = json_data_dict
# TODO: move to process_instance_service once we clean it and the processor up
@classmethod
def add_event_to_process_instance(
cls,
process_instance: ProcessInstanceModel,
event_type: str,
task_guid: Optional[str] = None,
user_id: Optional[int] = None,
exception: Optional[Exception] = None,
timestamp: Optional[float] = None,
add_to_db_session: Optional[bool] = True,
) -> Tuple[ProcessInstanceEventModel, Optional[ProcessInstanceErrorDetailModel]]:
if user_id is None and hasattr(g, "user") and g.user:
user_id = g.user.id
if timestamp is None:
timestamp = time.time()
process_instance_event = ProcessInstanceEventModel(
process_instance_id=process_instance.id, event_type=event_type, timestamp=timestamp, user_id=user_id
)
if task_guid:
process_instance_event.task_guid = task_guid
if add_to_db_session:
db.session.add(process_instance_event)
process_instance_error_detail = None
if exception is not None:
# truncate to avoid database errors on large values. We observed that text in mysql is 65K.
stacktrace = traceback.format_exc().split("\n")
message = str(exception)[0:1023]
task_line_number = None
task_line_contents = None
task_trace = None
task_offset = None
if isinstance(exception, WorkflowTaskException) or (
isinstance(exception, ApiError) and exception.error_code == "task_error"
):
task_line_number = exception.line_number
task_line_contents = exception.error_line
task_trace = exception.task_trace
task_offset = exception.offset
process_instance_error_detail = ProcessInstanceErrorDetailModel(
process_instance_event=process_instance_event,
message=message,
stacktrace=stacktrace,
task_line_number=task_line_number,
task_line_contents=task_line_contents,
task_trace=task_trace,
task_offset=task_offset,
)
if add_to_db_session:
db.session.add(process_instance_error_detail)
return (process_instance_event, process_instance_error_detail)

View File

@ -1,5 +1,8 @@
from __future__ import annotations
import copy
import time
from abc import abstractmethod
from typing import Callable
from typing import List
from typing import Optional
@ -9,6 +12,7 @@ from uuid import UUID
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.exceptions import WorkflowTaskException
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
@ -19,7 +23,7 @@ from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService,
@ -28,21 +32,75 @@ from spiffworkflow_backend.services.task_service import StartAndEndTimes
from spiffworkflow_backend.services.task_service import TaskService
class WorkflowExecutionServiceError(WorkflowTaskException): # type: ignore
@classmethod
def from_workflow_task_exception(
cls,
workflow_task_exception: WorkflowTaskException,
) -> WorkflowExecutionServiceError:
return cls(
error_msg=str(workflow_task_exception),
task=workflow_task_exception.task,
exception=workflow_task_exception,
line_number=workflow_task_exception.line_number,
offset=workflow_task_exception.offset,
error_line=workflow_task_exception.error_line,
)
class ExecutionStrategyNotConfiguredError(Exception):
pass
class EngineStepDelegate:
"""Interface of sorts for a concrete engine step delegate."""
@abstractmethod
def will_complete_task(self, spiff_task: SpiffTask) -> None:
pass
@abstractmethod
def did_complete_task(self, spiff_task: SpiffTask) -> None:
pass
@abstractmethod
def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None:
pass
@abstractmethod
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
@abstractmethod
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""
def __init__(self, delegate: EngineStepDelegate):
"""__init__."""
self.delegate = delegate
@abstractmethod
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
pass
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.on_exception(bpmn_process_instance)
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.save(bpmn_process_instance)
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
return list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
class TaskModelSavingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of saving a task model to the database.
@ -55,17 +113,17 @@ class TaskModelSavingDelegate(EngineStepDelegate):
serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
secondary_engine_step_delegate: EngineStepDelegate | None = None,
) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
self.serializer = serializer
self.current_task_start_in_seconds: Optional[float] = None
self.current_task_start_in_seconds: float | None = None
self.last_completed_spiff_task: Optional[SpiffTask] = None
self.spiff_tasks_to_process: Set[UUID] = set()
self.last_completed_spiff_task: SpiffTask | None = None
self.spiff_tasks_to_process: set[UUID] = set()
self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {}
self.task_service = TaskService(
@ -104,29 +162,12 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None:
script_engine = bpmn_process_instance.script_engine
if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None:
failing_spiff_task = script_engine.failing_spiff_task
self.task_service.update_task_model_with_spiff_task(failing_spiff_task, task_failed=True)
self.task_service.process_spiff_task_parent_subprocess_tasks(failing_spiff_task)
self.task_service.process_spiff_task_children(failing_spiff_task)
self.task_service.save_objects_to_database()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False)
db.session.commit()
def _add_children(self, spiff_task: SpiffTask) -> None:
for child_spiff_task in spiff_task.children:
self.spiff_tasks_to_process.add(child_spiff_task.id)
self._add_children(child_spiff_task)
def _add_parents(self, spiff_task: SpiffTask) -> None:
if spiff_task.parent and spiff_task.parent.task_spec.name != "Root":
self.spiff_tasks_to_process.add(spiff_task.parent.id)
self._add_parents(spiff_task.parent)
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
if self._should_update_task_model():
# NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace
@ -139,6 +180,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
| TaskState.MAYBE
| TaskState.LIKELY
| TaskState.FUTURE
| TaskState.STARTED
| TaskState.ERROR
):
# these will be removed from the parent and then ignored
if waiting_spiff_task._has_state(TaskState.PREDICTED_MASK):
@ -185,6 +228,19 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# self.task_service.process_spiff_task_children(self.last_completed_spiff_task)
# self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.after_engine_steps(bpmn_process_instance)
def _add_children(self, spiff_task: SpiffTask) -> None:
for child_spiff_task in spiff_task.children:
self.spiff_tasks_to_process.add(child_spiff_task.id)
self._add_children(child_spiff_task)
def _add_parents(self, spiff_task: SpiffTask) -> None:
if spiff_task.parent and spiff_task.parent.task_spec.name != "Root":
self.spiff_tasks_to_process.add(spiff_task.parent.id)
self._add_parents(spiff_task.parent)
def _should_update_task_model(self) -> bool:
"""We need to figure out if we have previously save task info on this process intance.
@ -194,29 +250,6 @@ class TaskModelSavingDelegate(EngineStepDelegate):
return True
class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""
def __init__(self, delegate: EngineStepDelegate):
"""__init__."""
self.delegate = delegate
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
pass
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.save(bpmn_process_instance)
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
return list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
class GreedyExecutionStrategy(ExecutionStrategy):
"""The common execution strategy. This will greedily run all engine steps without stopping."""
@ -363,7 +396,17 @@ class WorkflowExecutionService:
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except WorkflowTaskException as wte:
TaskService.add_event_to_process_instance(
self.process_instance_model,
ProcessInstanceEventType.task_failed.value,
exception=wte,
task_guid=str(wte.task.id),
)
self.execution_strategy.on_exception(self.bpmn_process_instance)
raise WorkflowExecutionServiceError.from_workflow_task_exception(wte) from wte
except SpiffWorkflowException as swe:
self.execution_strategy.on_exception(self.bpmn_process_instance)
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally:

View File

@ -36,6 +36,7 @@ from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.tasks_controller import _interstitial_stream
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -558,6 +559,47 @@ class TestProcessApi(BaseTest):
assert simple_form["is_executable"] is True
assert simple_form["is_primary"] is True
def test_process_callers(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""It should be possible to get a list of all processes that call another process."""
load_test_spec(
"test_group_one/simple_form",
process_model_source_directory="simple_form",
bpmn_file_name="simple_form",
)
# When adding a process model with one Process, no decisions, and some json files, only one process is recorded.
assert len(SpecReferenceCache.query.all()) == 1
# but no callers are recorded
assert ProcessCallerService.count() == 0
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id="test_group_two",
process_model_id="call_activity_nested",
bpmn_file_location="call_activity_nested",
)
# When adding a process model with 4 processes and a decision, 5 new records will be in the Cache
assert len(SpecReferenceCache.query.all()) == 6
# and 4 callers recorded
assert ProcessCallerService.count() == 4
# get the results
response = client.get(
"/v1.0/processes/Level2/callers",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.json is not None
# We should get 1 back, Level1 calls Level2
assert len(response.json) == 1
caller = response.json[0]
assert caller["identifier"] == "Level1"
def test_process_group_add(
self,
app: Flask,
@ -2105,7 +2147,7 @@ class TestProcessApi(BaseTest):
assert response.status_code == 400
api_error = json.loads(response.get_data(as_text=True))
assert api_error["error_code"] == "task_error"
assert api_error["error_code"] == "unexpected_workflow_exception"
assert 'TypeError:can only concatenate str (not "int") to str' in api_error["message"]
process = db.session.query(ProcessInstanceModel).filter(ProcessInstanceModel.id == process_instance_id).first()
@ -2185,7 +2227,7 @@ class TestProcessApi(BaseTest):
processor = ProcessInstanceProcessor(process_instance)
spiff_task = processor.get_task_by_bpmn_identifier("script_task_two", processor.bpmn_process_instance)
assert spiff_task is not None
assert spiff_task.state == TaskState.WAITING
assert spiff_task.state == TaskState.ERROR
assert spiff_task.data == {"my_var": "THE VAR"}
def test_process_model_file_create(

View File

@ -5,22 +5,19 @@ from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
class TestRefreshPermissions(BaseTest):
"""TestRefreshPermissions."""
def test_refresh_permissions_requires_elevated_permission(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_refresh_permissions_requires_elevated_permission."""
basic_user = self.find_or_create_user("basic_user")
privileged_user = self.find_or_create_user("privileged_user")
self.add_permissions_to_user(
@ -38,7 +35,7 @@ class TestRefreshPermissions(BaseTest):
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError) as exception:
with pytest.raises(WorkflowExecutionServiceError) as exception:
processor.do_engine_steps(save=True)
assert "ScriptUnauthorizedForUserError" in str(exception)

View File

@ -5,7 +5,6 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend import db
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
@ -19,6 +18,7 @@ from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
class TestErrorHandlingService(BaseTest):
@ -34,9 +34,9 @@ class TestErrorHandlingService(BaseTest):
process_model.id, user
)
pip = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError) as e:
with pytest.raises(WorkflowExecutionServiceError) as e:
pip.do_engine_steps(save=True)
ErrorHandlingService().handle_error(pip, e.value)
ErrorHandlingService().handle_error(process_instance, e.value)
return process_instance
def test_handle_error_suspends_or_faults_process(

View File

@ -0,0 +1,128 @@
from typing import Generator
import pytest
from flask.app import Flask
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_caller import ProcessCallerCacheModel
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
@pytest.fixture()
def with_clean_cache(app: Flask) -> Generator[None, None, None]:
db.session.query(ProcessCallerCacheModel).delete()
db.session.commit()
yield
@pytest.fixture()
def with_no_process_callers(with_clean_cache: None) -> Generator[None, None, None]:
yield
@pytest.fixture()
def with_single_process_caller(with_clean_cache: None) -> Generator[None, None, None]:
db.session.add(ProcessCallerCacheModel(process_identifier="called_once", calling_process_identifier="one_caller"))
db.session.commit()
yield
@pytest.fixture()
def with_multiple_process_callers(with_clean_cache: None) -> Generator[None, None, None]:
db.session.add(ProcessCallerCacheModel(process_identifier="called_many", calling_process_identifier="one_caller"))
db.session.add(ProcessCallerCacheModel(process_identifier="called_many", calling_process_identifier="two_caller"))
db.session.add(
ProcessCallerCacheModel(process_identifier="called_many", calling_process_identifier="three_caller")
)
db.session.commit()
yield
class TestProcessCallerService(BaseTest):
"""Infer from class name."""
def test_has_zero_count_when_empty(self, with_no_process_callers: None) -> None:
assert ProcessCallerService.count() == 0
def test_has_expected_count_when_not_empty(self, with_multiple_process_callers: None) -> None:
assert ProcessCallerService.count() == 3
def test_can_clear_the_cache(self, with_multiple_process_callers: None) -> None:
ProcessCallerService.clear_cache()
assert ProcessCallerService.count() == 0
def test_can_clear_the_cache_when_empty(self, with_no_process_callers: None) -> None:
ProcessCallerService.clear_cache()
assert ProcessCallerService.count() == 0
def test_can_clear_the_cache_for_process_id(self, with_single_process_caller: None) -> None:
ProcessCallerService.clear_cache_for_process_ids(["called_once"])
assert ProcessCallerService.count() == 0
def test_can_clear_the_cache_for_calling_process_id(self, with_multiple_process_callers: None) -> None:
ProcessCallerService.clear_cache_for_process_ids(["one_caller"])
assert ProcessCallerService.count() == 2
def test_can_clear_the_cache_for_callee_caller_process_id(
self, with_single_process_caller: None, with_multiple_process_callers: None
) -> None:
ProcessCallerService.clear_cache_for_process_ids(["one_caller"])
assert ProcessCallerService.count() == 2
def test_can_clear_the_cache_for_process_id_and_leave_other_process_ids_alone(
self,
with_single_process_caller: None,
with_multiple_process_callers: None,
) -> None:
ProcessCallerService.clear_cache_for_process_ids(["called_many"])
assert ProcessCallerService.count() == 1
def test_can_clear_the_cache_for_process_id_when_it_doesnt_exist(
self,
with_multiple_process_callers: None,
) -> None:
ProcessCallerService.clear_cache_for_process_ids(["garbage"])
assert ProcessCallerService.count() == 3
def test_no_records_added_if_calling_process_ids_is_empty(self, with_no_process_callers: None) -> None:
ProcessCallerService.add_caller("bob", [])
assert ProcessCallerService.count() == 0
def test_can_add_caller_for_new_process(self, with_no_process_callers: None) -> None:
ProcessCallerService.add_caller("bob", ["new_caller"])
assert ProcessCallerService.count() == 1
def test_can_many_callers_for_new_process(self, with_no_process_callers: None) -> None:
ProcessCallerService.add_caller("bob", ["new_caller", "another_new_caller"])
assert ProcessCallerService.count() == 2
def test_can_add_caller_for_existing_process(self, with_multiple_process_callers: None) -> None:
ProcessCallerService.add_caller("called_many", ["new_caller"])
assert ProcessCallerService.count() == 4
def test_can_add_many_callers_for_existing_process(self, with_multiple_process_callers: None) -> None:
ProcessCallerService.add_caller("called_many", ["new_caller", "another_new_caller"])
assert ProcessCallerService.count() == 5
def test_can_track_duplicate_callers(self, with_no_process_callers: None) -> None:
ProcessCallerService.add_caller("bob", ["new_caller", "new_caller"])
assert ProcessCallerService.count() == 2
def test_can_return_no_callers_when_no_records(self, with_no_process_callers: None) -> None:
assert ProcessCallerService.callers("bob") == []
def test_can_return_no_callers_when_process_id_is_unknown(self, with_multiple_process_callers: None) -> None:
assert ProcessCallerService.callers("bob") == []
def test_can_return_single_caller(self, with_single_process_caller: None) -> None:
assert ProcessCallerService.callers("called_once") == ["one_caller"]
def test_can_return_mulitple_callers(self, with_multiple_process_callers: None) -> None:
callers = sorted(ProcessCallerService.callers("called_many"))
assert callers == ["one_caller", "three_caller", "two_caller"]
def test_can_return_single_caller_when_there_are_other_process_ids(
self, with_single_process_caller: None, with_multiple_process_callers: None
) -> None:
assert ProcessCallerService.callers("called_once") == ["one_caller"]

View File

@ -10,12 +10,12 @@ from SpiffWorkflow.task import TaskState
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel
@ -29,6 +29,7 @@ from spiffworkflow_backend.services.process_instance_processor import (
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
class TestProcessInstanceProcessor(BaseTest):
@ -720,7 +721,7 @@ class TestProcessInstanceProcessor(BaseTest):
spiff_task = processor.get_task_by_guid(human_task_three.task_id)
ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user, human_task_three)
def test_task_data_is_set_even_if_process_instance_errors(
def test_task_data_is_set_even_if_process_instance_errors_and_creates_task_failed_event(
self,
app: Flask,
client: FlaskClient,
@ -738,7 +739,7 @@ class TestProcessInstanceProcessor(BaseTest):
)
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError):
with pytest.raises(WorkflowExecutionServiceError):
processor.do_engine_steps(save=True)
process_instance_final = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
@ -748,5 +749,22 @@ class TestProcessInstanceProcessor(BaseTest):
"script_task_two", processor_final.bpmn_process_instance
)
assert spiff_task is not None
assert spiff_task.state == TaskState.WAITING
assert spiff_task.state == TaskState.ERROR
assert spiff_task.data == {"my_var": "THE VAR"}
process_instance_events = process_instance.process_instance_events
assert len(process_instance_events) == 4
error_events = [
e for e in process_instance_events if e.event_type == ProcessInstanceEventType.task_failed.value
]
assert len(error_events) == 1
error_event = error_events[0]
assert error_event.task_guid is not None
process_instance_error_details = error_event.error_details
assert len(process_instance_error_details) == 1
error_detail = process_instance_error_details[0]
assert error_detail.message == "NameError:name 'hey' is not defined. Did you mean 'my_var'?"
assert error_detail.task_offset is None
assert error_detail.task_line_number == 1
assert error_detail.task_line_contents == "hey"
assert error_detail.task_trace is not None

View File

@ -5,24 +5,21 @@ from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
class TestOpenFile(BaseTest):
"""TestVariousBpmnConstructs."""
def test_dot_notation(
class TestRestrictedScriptEngine(BaseTest):
def test_dot_notation_with_open_file(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_form_data_conversion_to_dot_dict."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
"test_group/dangerous",
@ -34,22 +31,17 @@ class TestOpenFile(BaseTest):
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError) as exception:
with pytest.raises(WorkflowExecutionServiceError) as exception:
processor.do_engine_steps(save=True)
assert "name 'open' is not defined" in str(exception.value)
class TestImportModule(BaseTest):
"""TestVariousBpmnConstructs."""
def test_dot_notation(
def test_dot_notation_with_import_module(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_form_data_conversion_to_dot_dict."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
"test_group/dangerous",
@ -61,6 +53,6 @@ class TestImportModule(BaseTest):
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError) as exception:
with pytest.raises(WorkflowExecutionServiceError) as exception:
processor.do_engine_steps(save=True)
assert "Import not allowed: os" in str(exception.value)

View File

@ -1,5 +1,10 @@
import { Notification } from './Notification';
import useAPIError from '../hooks/UseApiError';
import {
ErrorForDisplay,
ProcessInstanceEventErrorDetail,
ProcessInstanceLogEntry,
} from '../interfaces';
function errorDetailDisplay(
errorObject: any,
@ -18,57 +23,96 @@ function errorDetailDisplay(
return null;
}
export const errorForDisplayFromProcessInstanceErrorDetail = (
processInstanceEvent: ProcessInstanceLogEntry,
processInstanceErrorEventDetail: ProcessInstanceEventErrorDetail
) => {
const errorForDisplay: ErrorForDisplay = {
message: processInstanceErrorEventDetail.message,
messageClassName: 'failure-string',
task_name: processInstanceEvent.task_definition_name,
task_id: processInstanceEvent.task_definition_identifier,
line_number: processInstanceErrorEventDetail.task_line_number,
error_line: processInstanceErrorEventDetail.task_line_contents,
task_trace: processInstanceErrorEventDetail.task_trace,
stacktrace: processInstanceErrorEventDetail.stacktrace,
};
return errorForDisplay;
};
export const childrenForErrorObject = (errorObject: ErrorForDisplay) => {
let sentryLinkTag = null;
if (errorObject.sentry_link) {
sentryLinkTag = (
<span>
{
': Find details about this error here (it may take a moment to become available): '
}
<a href={errorObject.sentry_link} target="_blank" rel="noreferrer">
{errorObject.sentry_link}
</a>
</span>
);
}
const message = (
<div className={errorObject.messageClassName}>{errorObject.message}</div>
);
const taskName = errorDetailDisplay(errorObject, 'task_name', 'Task Name');
const taskId = errorDetailDisplay(errorObject, 'task_id', 'Task ID');
const fileName = errorDetailDisplay(errorObject, 'file_name', 'File Name');
const lineNumber = errorDetailDisplay(
errorObject,
'line_number',
'Line Number'
);
const errorLine = errorDetailDisplay(errorObject, 'error_line', 'Context');
let codeTrace = null;
if (errorObject.task_trace && errorObject.task_trace.length > 0) {
codeTrace = (
<div className="error_info">
<span className="error_title">Call Activity Trace:</span>
{errorObject.task_trace.reverse().join(' -> ')}
</div>
);
} else if (errorObject.stacktrace) {
codeTrace = (
<pre className="error_info">
<span className="error_title">Stacktrace:</span>
{errorObject.stacktrace.reverse().map((a) => (
<>
{a}
<br />
</>
))}
</pre>
);
}
return [
message,
<br />,
sentryLinkTag,
taskName,
taskId,
fileName,
lineNumber,
errorLine,
codeTrace,
];
};
export default function ErrorDisplay() {
const errorObject = useAPIError().error;
const { removeError } = useAPIError();
let errorTag = null;
if (errorObject) {
let sentryLinkTag = null;
if (errorObject.sentry_link) {
sentryLinkTag = (
<span>
{
': Find details about this error here (it may take a moment to become available): '
}
<a href={errorObject.sentry_link} target="_blank" rel="noreferrer">
{errorObject.sentry_link}
</a>
</span>
);
}
const message = <div>{errorObject.message}</div>;
if (errorObject) {
const title = 'Error:';
const taskName = errorDetailDisplay(errorObject, 'task_name', 'Task Name');
const taskId = errorDetailDisplay(errorObject, 'task_id', 'Task ID');
const fileName = errorDetailDisplay(errorObject, 'file_name', 'File Name');
const lineNumber = errorDetailDisplay(
errorObject,
'line_number',
'Line Number'
);
const errorLine = errorDetailDisplay(errorObject, 'error_line', 'Context');
let taskTrace = null;
if (errorObject.task_trace && errorObject.task_trace.length > 1) {
taskTrace = (
<div className="error_info">
<span className="error_title">Call Activity Trace:</span>
{errorObject.task_trace.reverse().join(' -> ')}
</div>
);
}
errorTag = (
<Notification title={title} onClose={() => removeError()} type="error">
{message}
<br />
{sentryLinkTag}
{taskName}
{taskId}
{fileName}
{lineNumber}
{errorLine}
{taskTrace}
<>{childrenForErrorObject(errorObject)}</>
</Notification>
);
}

View File

@ -66,9 +66,7 @@ import { usePermissionFetcher } from '../hooks/PermissionService';
type OwnProps = {
processModelId: string;
diagramType: string;
readyOrWaitingProcessInstanceTasks?: Task[] | null;
completedProcessInstanceTasks?: Task[] | null;
cancelledProcessInstanceTasks?: Task[] | null;
tasks?: Task[] | null;
saveDiagram?: (..._args: any[]) => any;
onDeleteFile?: (..._args: any[]) => any;
isPrimaryFile?: boolean;
@ -93,9 +91,7 @@ type OwnProps = {
export default function ReactDiagramEditor({
processModelId,
diagramType,
readyOrWaitingProcessInstanceTasks,
completedProcessInstanceTasks,
cancelledProcessInstanceTasks,
tasks,
saveDiagram,
onDeleteFile,
isPrimaryFile,
@ -418,43 +414,29 @@ export default function ReactDiagramEditor({
// highlighting a field
// Option 3 at:
// https://github.com/bpmn-io/bpmn-js-examples/tree/master/colors
if (readyOrWaitingProcessInstanceTasks) {
if (tasks) {
const bpmnProcessIdentifiers = getBpmnProcessIdentifiers(
canvas.getRootElement()
);
readyOrWaitingProcessInstanceTasks.forEach((readyOrWaitingBpmnTask) => {
highlightBpmnIoElement(
canvas,
readyOrWaitingBpmnTask,
'active-task-highlight',
bpmnProcessIdentifiers
);
});
}
if (completedProcessInstanceTasks) {
const bpmnProcessIdentifiers = getBpmnProcessIdentifiers(
canvas.getRootElement()
);
completedProcessInstanceTasks.forEach((completedTask) => {
highlightBpmnIoElement(
canvas,
completedTask,
'completed-task-highlight',
bpmnProcessIdentifiers
);
});
}
if (cancelledProcessInstanceTasks) {
const bpmnProcessIdentifiers = getBpmnProcessIdentifiers(
canvas.getRootElement()
);
cancelledProcessInstanceTasks.forEach((cancelledTask) => {
highlightBpmnIoElement(
canvas,
cancelledTask,
'cancelled-task-highlight',
bpmnProcessIdentifiers
);
tasks.forEach((task: Task) => {
let className = '';
if (task.state === 'COMPLETED') {
className = 'completed-task-highlight';
} else if (task.state === 'READY' || task.state === 'WAITING') {
className = 'active-task-highlight';
} else if (task.state === 'CANCELLED') {
className = 'cancelled-task-highlight';
} else if (task.state === 'ERROR') {
className = 'errored-task-highlight';
}
if (className) {
highlightBpmnIoElement(
canvas,
task,
className,
bpmnProcessIdentifiers
);
}
});
}
}
@ -534,10 +516,8 @@ export default function ReactDiagramEditor({
diagramType,
diagramXML,
diagramXMLString,
readyOrWaitingProcessInstanceTasks,
completedProcessInstanceTasks,
cancelledProcessInstanceTasks,
fileName,
tasks,
performingXmlUpdates,
processModelId,
url,

View File

@ -71,9 +71,6 @@ export const convertDateObjectToFormattedString = (dateObject: Date) => {
};
export const dateStringToYMDFormat = (dateString: string) => {
if (dateString.length < 10) {
return dateString;
}
if (DATE_FORMAT.startsWith('dd')) {
const d = dateString.split('-');
return `${d[2]}-${d[1]}-${d[0]}`;
@ -107,12 +104,15 @@ export const convertDateAndTimeStringsToSeconds = (
};
export const convertStringToDate = (dateString: string) => {
return convertDateAndTimeStringsToSeconds(dateString, '00:10:00');
return convertDateAndTimeStringsToDate(dateString, '00:10:00');
};
export const ymdDateStringToConfiguredFormat = (dateString: string) => {
const dateObject = convertStringToDate(dateString);
return convertDateObjectToFormattedString(dateObject);
if (dateObject) {
return convertDateObjectToFormattedString(dateObject);
}
return null;
};
export const convertSecondsToDateObject = (seconds: number) => {
@ -155,7 +155,10 @@ export const convertSecondsToFormattedDateString = (seconds: number) => {
export const convertDateStringToSeconds = (dateString: string) => {
const dateObject = convertStringToDate(dateString);
return convertDateToSeconds(dateObject);
if (dateObject) {
return convertDateToSeconds(dateObject);
}
return null;
};
export const objectIsEmpty = (obj: object) => {

View File

@ -10,18 +10,19 @@ export const useUriListForPermissions = () => {
processGroupListPath: '/v1.0/process-groups',
processGroupShowPath: `/v1.0/process-groups/${params.process_group_id}`,
processInstanceActionPath: `/v1.0/process-instances/${params.process_model_id}/${params.process_instance_id}`,
processInstanceCompleteTaskPath: `/v1.0/complete-task/${params.process_model_id}/${params.process_instance_id}`,
processInstanceCreatePath: `/v1.0/process-instances/${params.process_model_id}`,
processInstanceErrorEventDetails: `/v1.0/event-error-details/${params.process_model_id}/${params.process_instance_id}`,
processInstanceListPath: '/v1.0/process-instances',
processInstanceLogListPath: `/v1.0/logs/${params.process_model_id}/${params.process_instance_id}`,
processInstanceReportListPath: '/v1.0/process-instances/reports',
processInstanceResumePath: `/v1.0/process-instance-resume/${params.process_model_id}/${params.process_instance_id}`,
processInstanceSuspendPath: `/v1.0/process-instance-suspend/${params.process_model_id}/${params.process_instance_id}`,
processInstanceResetPath: `/v1.0/process-instance-reset/${params.process_model_id}/${params.process_instance_id}`,
processInstanceTaskDataPath: `/v1.0/task-data/${params.process_model_id}/${params.process_instance_id}`,
processInstanceResumePath: `/v1.0/process-instance-resume/${params.process_model_id}/${params.process_instance_id}`,
processInstanceSendEventPath: `/v1.0/send-event/${params.process_model_id}/${params.process_instance_id}`,
processInstanceCompleteTaskPath: `/v1.0/complete-task/${params.process_model_id}/${params.process_instance_id}`,
processInstanceTaskListPath: `/v1.0/process-instances/${params.process_model_id}/${params.process_instance_id}/task-info`,
processInstanceSuspendPath: `/v1.0/process-instance-suspend/${params.process_model_id}/${params.process_instance_id}`,
processInstanceTaskDataPath: `/v1.0/task-data/${params.process_model_id}/${params.process_instance_id}`,
processInstanceTaskListForMePath: `/v1.0/process-instances/for-me/${params.process_model_id}/${params.process_instance_id}/task-info`,
processInstanceTaskListPath: `/v1.0/process-instances/${params.process_model_id}/${params.process_instance_id}/task-info`,
processInstanceTerminatePath: `/v1.0/process-instance-terminate/${params.process_model_id}/${params.process_instance_id}`,
processModelCreatePath: `/v1.0/process-models/${params.process_group_id}`,
processModelFileCreatePath: `/v1.0/process-models/${params.process_model_id}/files`,

View File

@ -146,6 +146,10 @@ code {
fill: blue !important;
opacity: .2;
}
.errored-task-highlight:not(.djs-connection) .djs-visual > :nth-child(1) {
fill: red !important;
opacity: .2;
}
.accordion-item-label {
vertical-align: middle;

View File

@ -37,6 +37,7 @@ export interface EventDefinition {
message_var?: string;
}
// TODO: merge with ProcessInstanceTask
export interface Task {
id: number;
guid: string;
@ -53,12 +54,6 @@ export interface Task {
event_definition?: EventDefinition;
}
export interface TaskIds {
completed: Task[];
readyOrWaiting: Task[];
cancelled: Task[];
}
export interface ProcessInstanceTask {
id: string;
task_id: string;
@ -232,12 +227,16 @@ export type HotCrumbItem = HotCrumbItemArray | HotCrumbItemObject;
export interface ErrorForDisplay {
message: string;
messageClassName?: string;
sentry_link?: string;
task_name?: string;
task_id?: string;
line_number?: number;
error_line?: string;
file_name?: string;
task_trace?: [string];
task_trace?: string[];
stacktrace?: string[];
}
export interface AuthenticationParam {
@ -295,6 +294,16 @@ export interface JsonSchemaForm {
required: string[];
}
export interface ProcessInstanceEventErrorDetail {
id: number;
message: string;
stacktrace: string[];
task_line_contents?: string;
task_line_number?: number;
task_offset?: number;
task_trace?: string[];
}
export interface ProcessInstanceLogEntry {
bpmn_process_definition_identifier: string;
bpmn_process_definition_name: string;

View File

@ -1,4 +1,5 @@
import { useEffect, useState } from 'react';
import { ErrorOutline } from '@carbon/icons-react';
import {
Table,
Tabs,
@ -10,6 +11,8 @@ import {
Button,
TextInput,
ComboBox,
Modal,
Loading,
// @ts-ignore
} from '@carbon/react';
import {
@ -28,8 +31,17 @@ import {
} from '../helpers';
import HttpService from '../services/HttpService';
import { useUriListForPermissions } from '../hooks/UriListForPermissions';
import { ProcessInstanceLogEntry } from '../interfaces';
import {
PermissionsToCheck,
ProcessInstanceEventErrorDetail,
ProcessInstanceLogEntry,
} from '../interfaces';
import Filters from '../components/Filters';
import { usePermissionFetcher } from '../hooks/PermissionService';
import {
childrenForErrorObject,
errorForDisplayFromProcessInstanceErrorDetail,
} from '../components/ErrorDisplay';
type OwnProps = {
variant: string;
@ -47,10 +59,16 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
const [taskTypes, setTaskTypes] = useState<string[]>([]);
const [eventTypes, setEventTypes] = useState<string[]>([]);
const { targetUris } = useUriListForPermissions();
const isDetailedView = searchParams.get('detailed') === 'true';
const [eventForModal, setEventForModal] =
useState<ProcessInstanceLogEntry | null>(null);
const [eventErrorDetails, setEventErrorDetails] =
useState<ProcessInstanceEventErrorDetail | null>(null);
const taskNameHeader = isDetailedView ? 'Task Name' : 'Milestone';
const { targetUris } = useUriListForPermissions();
const permissionRequestData: PermissionsToCheck = {
[targetUris.processInstanceErrorEventDetails]: ['GET'],
};
const { ability } = usePermissionFetcher(permissionRequestData);
const [showFilterOptions, setShowFilterOptions] = useState<boolean>(false);
@ -58,6 +76,8 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
if (variant === 'all') {
processInstanceShowPageBaseUrl = `/admin/process-instances/${params.process_model_id}`;
}
const isDetailedView = searchParams.get('detailed') === 'true';
const taskNameHeader = isDetailedView ? 'Task Name' : 'Milestone';
const updateSearchParams = (value: string, key: string) => {
if (value) {
@ -128,6 +148,92 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
isDetailedView,
]);
const handleErrorEventModalClose = () => {
setEventForModal(null);
setEventErrorDetails(null);
};
const errorEventModal = () => {
if (eventForModal) {
const modalHeading = 'Event Error Details';
let errorMessageTag = (
<Loading className="some-class" withOverlay={false} small />
);
if (eventErrorDetails) {
const errorForDisplay = errorForDisplayFromProcessInstanceErrorDetail(
eventForModal,
eventErrorDetails
);
const errorChildren = childrenForErrorObject(errorForDisplay);
// eslint-disable-next-line react/jsx-no-useless-fragment
errorMessageTag = <>{errorChildren}</>;
}
return (
<Modal
open={!!eventForModal}
passiveModal
onRequestClose={handleErrorEventModalClose}
modalHeading={modalHeading}
modalLabel="Error Details"
>
{errorMessageTag}
</Modal>
);
}
return null;
};
const handleErrorDetailsReponse = (
results: ProcessInstanceEventErrorDetail
) => {
setEventErrorDetails(results);
};
const getErrorDetailsForEvent = (logEntry: ProcessInstanceLogEntry) => {
setEventForModal(logEntry);
if (ability.can('GET', targetUris.processInstanceErrorEventDetails)) {
HttpService.makeCallToBackend({
path: `${targetUris.processInstanceErrorEventDetails}/${logEntry.id}`,
httpMethod: 'GET',
successCallback: handleErrorDetailsReponse,
failureCallback: (error: any) => {
const errorObject: ProcessInstanceEventErrorDetail = {
id: 0,
message: `ERROR retrieving error details: ${error.message}`,
stacktrace: [],
};
setEventErrorDetails(errorObject);
},
});
}
};
const eventTypeCell = (logEntry: ProcessInstanceLogEntry) => {
if (
['process_instance_error', 'task_failed'].includes(logEntry.event_type)
) {
const errorTitle = 'Event has an error';
const errorIcon = (
<>
&nbsp;
<ErrorOutline className="red-icon" />
</>
);
return (
<Button
kind="ghost"
className="button-link"
onClick={() => getErrorDetailsForEvent(logEntry)}
title={errorTitle}
>
{logEntry.event_type}
{errorIcon}
</Button>
);
}
return logEntry.event_type;
};
const getTableRow = (logEntry: ProcessInstanceLogEntry) => {
const tableRow = [];
const taskNameCell = (
@ -164,7 +270,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
<>
<td>{logEntry.task_definition_identifier}</td>
<td>{logEntry.bpmn_task_type}</td>
<td>{logEntry.event_type}</td>
<td>{eventTypeCell(logEntry)}</td>
<td>
{logEntry.username || (
<span className="system-user-log-entry">system</span>
@ -405,6 +511,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
]}
/>
{tabs()}
{errorEventModal()}
<Filters
filterOptions={filterOptions}
showFilterOptions={showFilterOptions}

View File

@ -47,7 +47,6 @@ import {
ProcessInstanceMetadata,
Task,
TaskDefinitionPropertiesJson,
TaskIds,
} from '../interfaces';
import { usePermissionFetcher } from '../hooks/PermissionService';
import ProcessInstanceClass from '../classes/ProcessInstanceClass';
@ -230,27 +229,6 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
});
};
const getTaskIds = () => {
const taskIds: TaskIds = {
completed: [],
readyOrWaiting: [],
cancelled: [],
};
if (tasks) {
tasks.forEach(function getUserTasksElement(task: Task) {
if (task.state === 'COMPLETED') {
taskIds.completed.push(task);
} else if (task.state === 'READY' || task.state === 'WAITING') {
taskIds.readyOrWaiting.push(task);
} else if (task.state === 'CANCELLED') {
taskIds.cancelled.push(task);
}
return null;
});
}
return taskIds;
};
const currentToTaskGuid = () => {
if (taskToTimeTravelTo) {
return taskToTimeTravelTo.guid;
@ -1098,7 +1076,6 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
};
if (processInstance && (tasks || tasksCallHadError)) {
const taskIds = getTaskIds();
const processModelId = unModifyProcessIdentifierForPathParam(
params.process_model_id ? params.process_model_id : ''
);
@ -1156,9 +1133,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
processModelId={processModelId || ''}
diagramXML={processInstance.bpmn_xml_file_contents || ''}
fileName={processInstance.bpmn_xml_file_contents || ''}
readyOrWaitingProcessInstanceTasks={taskIds.readyOrWaiting}
completedProcessInstanceTasks={taskIds.completed}
cancelledProcessInstanceTasks={taskIds.cancelled}
tasks={tasks}
diagramType="readonly"
onElementClick={handleClickedDiagramTask}
/>

View File

@ -102,7 +102,11 @@ export default function BaseInputTemplate<
// it should in be y-m-d when it gets here.
let dateValue: string | null = '';
if (value || value === 0) {
dateValue = ymdDateStringToConfiguredFormat(value);
if (value.length < 10) {
dateValue = value;
} else {
dateValue = ymdDateStringToConfiguredFormat(value);
}
}
component = (

View File

@ -1,2 +1,3 @@
// carbon/react is not very typescript safe so ignore it
declare module '@carbon/react';
declare module '@carbon/icons-react';