From 88df3bd5c388559cb9c877f9731bb3042919d5aa Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 17 Mar 2023 13:20:06 -0400 Subject: [PATCH] added process instance event table --- spiffworkflow-backend/migrations/env.py | 2 + .../{8dce75b80bfd_.py => 05153ab6a6b8_.py} | 26 ++++++- .../models/process_instance_event.py | 41 +++++++++++ .../routes/process_instances_controller.py | 45 +++--------- .../services/process_instance_processor.py | 22 ++++++ .../services/workflow_execution_service.py | 22 +++++- .../integration/test_logging_service.py | 4 +- .../src/routes/ProcessInstanceLogList.tsx | 18 ++++- .../src/routes/ProcessInstanceShow.tsx | 71 +++++++++++++++++++ 9 files changed, 203 insertions(+), 48 deletions(-) rename spiffworkflow-backend/migrations/versions/{8dce75b80bfd_.py => 05153ab6a6b8_.py} (94%) create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py diff --git a/spiffworkflow-backend/migrations/env.py b/spiffworkflow-backend/migrations/env.py index 630e381a..68feded2 100644 --- a/spiffworkflow-backend/migrations/env.py +++ b/spiffworkflow-backend/migrations/env.py @@ -1,3 +1,5 @@ +from __future__ import with_statement + import logging from logging.config import fileConfig diff --git a/spiffworkflow-backend/migrations/versions/8dce75b80bfd_.py b/spiffworkflow-backend/migrations/versions/05153ab6a6b8_.py similarity index 94% rename from spiffworkflow-backend/migrations/versions/8dce75b80bfd_.py rename to spiffworkflow-backend/migrations/versions/05153ab6a6b8_.py index 6618a304..5ee8fda7 100644 --- a/spiffworkflow-backend/migrations/versions/8dce75b80bfd_.py +++ b/spiffworkflow-backend/migrations/versions/05153ab6a6b8_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 8dce75b80bfd +Revision ID: 05153ab6a6b8 Revises: -Create Date: 2023-03-17 09:08:24.146736 +Create Date: 2023-03-17 12:22:43.449203 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '8dce75b80bfd' +revision = '05153ab6a6b8' down_revision = None branch_labels = None depends_on = None @@ -269,6 +269,21 @@ def upgrade(): sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), sa.PrimaryKeyConstraint('id') ) + op.create_table('process_instance_event', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('task_guid', sa.String(length=36), nullable=True), + sa.Column('process_instance_id', sa.Integer(), nullable=False), + sa.Column('event_type', sa.String(length=50), nullable=False), + sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_process_instance_event_event_type'), 'process_instance_event', ['event_type'], unique=False) + op.create_index(op.f('ix_process_instance_event_task_guid'), 'process_instance_event', ['task_guid'], unique=False) + op.create_index(op.f('ix_process_instance_event_timestamp'), 'process_instance_event', ['timestamp'], unique=False) + op.create_index(op.f('ix_process_instance_event_user_id'), 'process_instance_event', ['user_id'], unique=False) op.create_table('process_instance_file_data', sa.Column('id', sa.Integer(), nullable=False), sa.Column('process_instance_id', sa.Integer(), nullable=False), @@ -424,6 +439,11 @@ def downgrade(): op.drop_table('process_instance_metadata') op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data') op.drop_table('process_instance_file_data') + op.drop_index(op.f('ix_process_instance_event_user_id'), table_name='process_instance_event') + op.drop_index(op.f('ix_process_instance_event_timestamp'), table_name='process_instance_event') + op.drop_index(op.f('ix_process_instance_event_task_guid'), table_name='process_instance_event') + op.drop_index(op.f('ix_process_instance_event_event_type'), table_name='process_instance_event') + op.drop_table('process_instance_event') op.drop_table('message_instance') op.drop_index(op.f('ix_process_instance_process_model_identifier'), table_name='process_instance') op.drop_index(op.f('ix_process_instance_process_model_display_name'), table_name='process_instance') diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py new file mode 100644 index 00000000..de965e9a --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_event.py @@ -0,0 +1,41 @@ +from __future__ import annotations +from spiffworkflow_backend.models.user import UserModel +from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum +from typing import Any +from sqlalchemy.orm import validates + +from sqlalchemy import ForeignKey + +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel + + +# event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same. +class ProcessInstanceEventType(SpiffEnum): + process_instance_resumed = "process_instance_resumed" + process_instance_rewound_to_task = "process_instance_rewound_to_task" + process_instance_suspended = "process_instance_suspended" + process_instance_terminated = "process_instance_terminated" + task_completed = "task_completed" + task_data_edited = "task_data_edited" + task_executed_manually = "task_executed_manually" + task_failed = "task_failed" + task_skipped = "task_skipped" + + +class ProcessInstanceEventModel(SpiffworkflowBaseDBModel): + __tablename__ = "process_instance_event" + id: int = db.Column(db.Integer, primary_key=True) + + # use task guid so we can bulk insert without worrying about whether or not the task has an id yet + task_guid: str | None = db.Column(db.String(36), nullable=True, index=True) + process_instance_id: int = db.Column(ForeignKey("process_instance.id"), nullable=False) + + event_type: str = db.Column(db.String(50), nullable=False, index=True) + timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False, index=True) + + user_id = db.Column(ForeignKey(UserModel.id), nullable=True, index=True) # type: ignore + + @validates("event_type") + def validate_event_type(self, key: str, value: Any) -> Any: + return self.validate_enum_field(key, value, ProcessInstanceEventType) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index cd84f3d4..e82a1aa9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -29,6 +29,7 @@ from spiffworkflow_backend.models.process_instance import ( ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel from spiffworkflow_backend.models.process_instance_metadata import ( ProcessInstanceMetadataModel, ) @@ -240,37 +241,11 @@ def process_instance_log_list( # to make sure the process instance exists process_instance = _find_process_instance_by_id_or_raise(process_instance_id) - # log_query = SpiffLoggingModel.query.filter(SpiffLoggingModel.process_instance_id == process_instance.id) - # if not detailed: - # log_query = log_query.filter( - # # 1. this was the previous implementation, where we only show completed tasks and skipped tasks. - # # maybe we want to iterate on this in the future (in a third tab under process instance logs?) - # # or_( - # # SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore - # # SpiffLoggingModel.message.like("Skipped task %"), # type: ignore - # # ) - # # 2. We included ["End Event", "Default Start Event"] along with Default Throwing Event, but feb 2023 - # # we decided to remove them, since they get really chatty when there are lots of subprocesses and call activities. - # and_( - # SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore - # SpiffLoggingModel.bpmn_task_type.in_(["Default Throwing Event"]), # type: ignore - # ) - # ) - # - # logs = ( - # log_query.order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore - # .join( - # UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True - # ) # isouter since if we don't have a user, we still want the log - # .add_columns( - # UserModel.username, - # ) - # .paginate(page=page, per_page=per_page, error_out=False) - # ) log_query = ( - TaskModel.query.filter_by(process_instance_id=process_instance.id) - .join(TaskDefinitionModel, TaskDefinitionModel.id == TaskModel.task_definition_id) - .join( + ProcessInstanceEventModel.query.filter_by(process_instance_id=process_instance.id) + .outerjoin(TaskModel, TaskModel.guid == ProcessInstanceEventModel.task_guid) + .outerjoin(TaskDefinitionModel, TaskDefinitionModel.id == TaskModel.task_definition_id) + .outerjoin( BpmnProcessDefinitionModel, BpmnProcessDefinitionModel.id == TaskDefinitionModel.bpmn_process_definition_id ) ) @@ -289,15 +264,11 @@ def process_instance_log_list( TaskDefinitionModel.typename.in_(["IntermediateThrowEvent"]), # type: ignore ) ) - else: - log_query = log_query.filter( - TaskModel.state.in_(["COMPLETED"]), # type: ignore - ) logs = ( - log_query.order_by(TaskModel.end_in_seconds.desc(), TaskModel.id.desc()) # type: ignore - .outerjoin(HumanTaskModel, HumanTaskModel.task_model_id == TaskModel.id) - .outerjoin(UserModel, UserModel.id == HumanTaskModel.completed_by_user_id) + log_query.order_by(ProcessInstanceEventModel.timestamp.desc(), + ProcessInstanceEventModel.id.desc()) # type: ignore + .outerjoin(UserModel, UserModel.id == ProcessInstanceEventModel.user_id) .add_columns( TaskModel.guid.label("spiff_task_guid"), # type: ignore UserModel.username, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 7435f39e..0338aee3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,5 +1,7 @@ """Process_instance_processor.""" import _strptime # type: ignore +from flask import g +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel, ProcessInstanceEventType import decimal import json import logging @@ -1811,6 +1813,9 @@ class ProcessInstanceProcessor: json_data = JsonDataModel(**json_data_dict) db.session.add(json_data) + self.add_event_to_process_instance(self.process_instance_model, + ProcessInstanceEventType.task_completed.value, task_guid=task_model.guid) + # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() @@ -1935,16 +1940,33 @@ class ProcessInstanceProcessor: self.save() self.process_instance_model.status = "terminated" db.session.add(self.process_instance_model) + self.add_event_to_process_instance(self.process_instance_model, + ProcessInstanceEventType.process_instance_terminated.value) db.session.commit() def suspend(self) -> None: """Suspend.""" self.process_instance_model.status = ProcessInstanceStatus.suspended.value db.session.add(self.process_instance_model) + self.add_event_to_process_instance(self.process_instance_model, + ProcessInstanceEventType.process_instance_suspended.value) db.session.commit() def resume(self) -> None: """Resume.""" self.process_instance_model.status = ProcessInstanceStatus.waiting.value db.session.add(self.process_instance_model) + self.add_event_to_process_instance(self.process_instance_model, + ProcessInstanceEventType.process_instance_resumed.value) db.session.commit() + + @classmethod + def add_event_to_process_instance(cls, process_instance: ProcessInstanceModel, event_type: str, task_guid: Optional[str] = None) -> None: + user_id = None + if g.user: + user_id = g.user.id + process_instance_event = ProcessInstanceEventModel( + process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id) + if task_guid: + process_instance_event.task_guid = task_guid + db.session.add(process_instance_event) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index fe98ac80..f7320e34 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -17,6 +17,7 @@ from spiffworkflow_backend.models.message_instance_correlation import ( MessageInstanceCorrelationRuleModel, ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.services.assertion_service import safe_assertion @@ -63,12 +64,14 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings + self.serializer = serializer self.current_task_model: Optional[TaskModel] = None self.current_task_start_in_seconds: Optional[float] = None + self.task_models: dict[str, TaskModel] = {} self.json_data_dicts: dict[str, JsonDataDict] = {} - self.serializer = serializer + self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} def will_complete_task(self, spiff_task: SpiffTask) -> None: if self._should_update_task_model(): @@ -90,9 +93,10 @@ class TaskModelSavingDelegate(EngineStepDelegate): script_engine = bpmn_process_instance.script_engine if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None: failing_spiff_task = script_engine.failing_spiff_task - self._update_task_model_with_spiff_task(failing_spiff_task) + self._update_task_model_with_spiff_task(failing_spiff_task, task_failed=True) db.session.bulk_save_objects(self.task_models.values()) + db.session.bulk_save_objects(self.process_instance_events.values()) TaskService.insert_or_update_json_data_records(self.json_data_dicts) @@ -121,7 +125,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): if json_data_dict is not None: self.json_data_dicts[json_data_dict["hash"]] = json_data_dict - def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask) -> TaskModel: + def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask, task_failed: bool = False) -> TaskModel: bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( spiff_task, @@ -141,6 +145,18 @@ class TaskModelSavingDelegate(EngineStepDelegate): json_data_dict_list.append(bpmn_process_json_data) self._update_json_data_dicts_using_list(json_data_dict_list) + if task_model.state == "COMPLETED" or task_failed: + event_type = "task_completed" + if task_failed: + event_type = "task_errored" + + # FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete + # which script tasks execute when READY. + timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time() + process_instance_event = ProcessInstanceEventModel( + task_guid=task_model.guid, process_instance_id=self.process_instance.id, event_type=event_type, timestamp=timestamp) + self.process_instance_events[task_model.guid] = process_instance_event + return task_model diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_logging_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_logging_service.py index 990cc3ba..f79a3295 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_logging_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_logging_service.py @@ -64,8 +64,8 @@ class TestLoggingService(BaseTest): for log in logs: assert log["process_instance_id"] == process_instance.id for key in [ - "start_in_seconds", - "end_in_seconds", + "event_type", + "timestamp", "spiff_task_guid", "bpmn_process_definition_identifier", "bpmn_process_definition_name", diff --git a/spiffworkflow-frontend/src/routes/ProcessInstanceLogList.tsx b/spiffworkflow-frontend/src/routes/ProcessInstanceLogList.tsx index 05c2eb87..365ab7f1 100644 --- a/spiffworkflow-frontend/src/routes/ProcessInstanceLogList.tsx +++ b/spiffworkflow-frontend/src/routes/ProcessInstanceLogList.tsx @@ -85,6 +85,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) { tableRow.push( <> {row.bpmn_task_type} + {row.event_type} {row.username || ( system @@ -99,7 +100,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) { data-qa="process-instance-show-link" to={`${processInstanceShowPageBaseUrl}/${row.process_instance_id}/${row.spiff_step}`} > - {convertSecondsToFormattedDateTime(row.end_in_seconds)} + {convertSecondsToFormattedDateTime(row.timestamp)} ); @@ -132,6 +133,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) { tableHeaders.push( <> Task Type + Event User ); @@ -177,7 +179,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) { setSearchParams(searchParams); }} > - Simple + Milestones - Detailed + Events + {/* + Suspend + Resumed + Terminated + + Skipped? + Rewind? + Execute? + Edit? + */}
diff --git a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx index 36c06d23..652e07b0 100644 --- a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx +++ b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx @@ -381,6 +381,40 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { {lastUpdatedTimeTag} + {/* + + + Suspended at:{' '} + + + 2023-03-17 10:12:05 (by jason) + + + + + Resumed at:{' '} + + + 2023-03-17 10:13:05 (by jason) + + + + + Suspended at:{' '} + + + 2023-03-17 10:14:05 (by jason) + + + + + Terminated at:{' '} + + + 2023-03-17 10:15:05 (by jason) + + + */} Process model revision:{' '} @@ -400,6 +434,43 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { + + {/* +
+ + + Suspended at:{' '} + + + 2023-03-17 10:12:05 (by jason) + + + + + Resumed at:{' '} + + + 2023-03-17 10:13:05 (by jason) + + + + + Suspended at:{' '} + + + 2023-03-17 10:14:05 (by jason) + + + + + Terminated at:{' '} + + + 2023-03-17 10:15:05 (by jason) + + + */} +