added process instance event table

This commit is contained in:
jasquat 2023-03-17 13:20:06 -04:00
parent 3461056beb
commit 88df3bd5c3
9 changed files with 203 additions and 48 deletions

View File

@ -1,3 +1,5 @@
from __future__ import with_statement
import logging import logging
from logging.config import fileConfig from logging.config import fileConfig

View File

@ -1,8 +1,8 @@
"""empty message """empty message
Revision ID: 8dce75b80bfd Revision ID: 05153ab6a6b8
Revises: Revises:
Create Date: 2023-03-17 09:08:24.146736 Create Date: 2023-03-17 12:22:43.449203
""" """
from alembic import op from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '8dce75b80bfd' revision = '05153ab6a6b8'
down_revision = None down_revision = None
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -269,6 +269,21 @@ def upgrade():
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id') sa.PrimaryKeyConstraint('id')
) )
op.create_table('process_instance_event',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_guid', sa.String(length=36), nullable=True),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('event_type', sa.String(length=50), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_process_instance_event_event_type'), 'process_instance_event', ['event_type'], unique=False)
op.create_index(op.f('ix_process_instance_event_task_guid'), 'process_instance_event', ['task_guid'], unique=False)
op.create_index(op.f('ix_process_instance_event_timestamp'), 'process_instance_event', ['timestamp'], unique=False)
op.create_index(op.f('ix_process_instance_event_user_id'), 'process_instance_event', ['user_id'], unique=False)
op.create_table('process_instance_file_data', op.create_table('process_instance_file_data',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False), sa.Column('process_instance_id', sa.Integer(), nullable=False),
@ -424,6 +439,11 @@ def downgrade():
op.drop_table('process_instance_metadata') op.drop_table('process_instance_metadata')
op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data') op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data')
op.drop_table('process_instance_file_data') op.drop_table('process_instance_file_data')
op.drop_index(op.f('ix_process_instance_event_user_id'), table_name='process_instance_event')
op.drop_index(op.f('ix_process_instance_event_timestamp'), table_name='process_instance_event')
op.drop_index(op.f('ix_process_instance_event_task_guid'), table_name='process_instance_event')
op.drop_index(op.f('ix_process_instance_event_event_type'), table_name='process_instance_event')
op.drop_table('process_instance_event')
op.drop_table('message_instance') op.drop_table('message_instance')
op.drop_index(op.f('ix_process_instance_process_model_identifier'), table_name='process_instance') op.drop_index(op.f('ix_process_instance_process_model_identifier'), table_name='process_instance')
op.drop_index(op.f('ix_process_instance_process_model_display_name'), table_name='process_instance') op.drop_index(op.f('ix_process_instance_process_model_display_name'), table_name='process_instance')

View File

@ -0,0 +1,41 @@
from __future__ import annotations
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum
from typing import Any
from sqlalchemy.orm import validates
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same.
class ProcessInstanceEventType(SpiffEnum):
process_instance_resumed = "process_instance_resumed"
process_instance_rewound_to_task = "process_instance_rewound_to_task"
process_instance_suspended = "process_instance_suspended"
process_instance_terminated = "process_instance_terminated"
task_completed = "task_completed"
task_data_edited = "task_data_edited"
task_executed_manually = "task_executed_manually"
task_failed = "task_failed"
task_skipped = "task_skipped"
class ProcessInstanceEventModel(SpiffworkflowBaseDBModel):
__tablename__ = "process_instance_event"
id: int = db.Column(db.Integer, primary_key=True)
# use task guid so we can bulk insert without worrying about whether or not the task has an id yet
task_guid: str | None = db.Column(db.String(36), nullable=True, index=True)
process_instance_id: int = db.Column(ForeignKey("process_instance.id"), nullable=False)
event_type: str = db.Column(db.String(50), nullable=False, index=True)
timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False, index=True)
user_id = db.Column(ForeignKey(UserModel.id), nullable=True, index=True) # type: ignore
@validates("event_type")
def validate_event_type(self, key: str, value: Any) -> Any:
return self.validate_enum_field(key, value, ProcessInstanceEventType)

View File

@ -29,6 +29,7 @@ from spiffworkflow_backend.models.process_instance import (
) )
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
from spiffworkflow_backend.models.process_instance_metadata import ( from spiffworkflow_backend.models.process_instance_metadata import (
ProcessInstanceMetadataModel, ProcessInstanceMetadataModel,
) )
@ -240,37 +241,11 @@ def process_instance_log_list(
# to make sure the process instance exists # to make sure the process instance exists
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
# log_query = SpiffLoggingModel.query.filter(SpiffLoggingModel.process_instance_id == process_instance.id)
# if not detailed:
# log_query = log_query.filter(
# # 1. this was the previous implementation, where we only show completed tasks and skipped tasks.
# # maybe we want to iterate on this in the future (in a third tab under process instance logs?)
# # or_(
# # SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore
# # SpiffLoggingModel.message.like("Skipped task %"), # type: ignore
# # )
# # 2. We included ["End Event", "Default Start Event"] along with Default Throwing Event, but feb 2023
# # we decided to remove them, since they get really chatty when there are lots of subprocesses and call activities.
# and_(
# SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore
# SpiffLoggingModel.bpmn_task_type.in_(["Default Throwing Event"]), # type: ignore
# )
# )
#
# logs = (
# log_query.order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore
# .join(
# UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True
# ) # isouter since if we don't have a user, we still want the log
# .add_columns(
# UserModel.username,
# )
# .paginate(page=page, per_page=per_page, error_out=False)
# )
log_query = ( log_query = (
TaskModel.query.filter_by(process_instance_id=process_instance.id) ProcessInstanceEventModel.query.filter_by(process_instance_id=process_instance.id)
.join(TaskDefinitionModel, TaskDefinitionModel.id == TaskModel.task_definition_id) .outerjoin(TaskModel, TaskModel.guid == ProcessInstanceEventModel.task_guid)
.join( .outerjoin(TaskDefinitionModel, TaskDefinitionModel.id == TaskModel.task_definition_id)
.outerjoin(
BpmnProcessDefinitionModel, BpmnProcessDefinitionModel.id == TaskDefinitionModel.bpmn_process_definition_id BpmnProcessDefinitionModel, BpmnProcessDefinitionModel.id == TaskDefinitionModel.bpmn_process_definition_id
) )
) )
@ -289,15 +264,11 @@ def process_instance_log_list(
TaskDefinitionModel.typename.in_(["IntermediateThrowEvent"]), # type: ignore TaskDefinitionModel.typename.in_(["IntermediateThrowEvent"]), # type: ignore
) )
) )
else:
log_query = log_query.filter(
TaskModel.state.in_(["COMPLETED"]), # type: ignore
)
logs = ( logs = (
log_query.order_by(TaskModel.end_in_seconds.desc(), TaskModel.id.desc()) # type: ignore log_query.order_by(ProcessInstanceEventModel.timestamp.desc(),
.outerjoin(HumanTaskModel, HumanTaskModel.task_model_id == TaskModel.id) ProcessInstanceEventModel.id.desc()) # type: ignore
.outerjoin(UserModel, UserModel.id == HumanTaskModel.completed_by_user_id) .outerjoin(UserModel, UserModel.id == ProcessInstanceEventModel.user_id)
.add_columns( .add_columns(
TaskModel.guid.label("spiff_task_guid"), # type: ignore TaskModel.guid.label("spiff_task_guid"), # type: ignore
UserModel.username, UserModel.username,

View File

@ -1,5 +1,7 @@
"""Process_instance_processor.""" """Process_instance_processor."""
import _strptime # type: ignore import _strptime # type: ignore
from flask import g
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel, ProcessInstanceEventType
import decimal import decimal
import json import json
import logging import logging
@ -1811,6 +1813,9 @@ class ProcessInstanceProcessor:
json_data = JsonDataModel(**json_data_dict) json_data = JsonDataModel(**json_data_dict)
db.session.add(json_data) db.session.add(json_data)
self.add_event_to_process_instance(self.process_instance_model,
ProcessInstanceEventType.task_completed.value, task_guid=task_model.guid)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save() self.save()
@ -1935,16 +1940,33 @@ class ProcessInstanceProcessor:
self.save() self.save()
self.process_instance_model.status = "terminated" self.process_instance_model.status = "terminated"
db.session.add(self.process_instance_model) db.session.add(self.process_instance_model)
self.add_event_to_process_instance(self.process_instance_model,
ProcessInstanceEventType.process_instance_terminated.value)
db.session.commit() db.session.commit()
def suspend(self) -> None: def suspend(self) -> None:
"""Suspend.""" """Suspend."""
self.process_instance_model.status = ProcessInstanceStatus.suspended.value self.process_instance_model.status = ProcessInstanceStatus.suspended.value
db.session.add(self.process_instance_model) db.session.add(self.process_instance_model)
self.add_event_to_process_instance(self.process_instance_model,
ProcessInstanceEventType.process_instance_suspended.value)
db.session.commit() db.session.commit()
def resume(self) -> None: def resume(self) -> None:
"""Resume.""" """Resume."""
self.process_instance_model.status = ProcessInstanceStatus.waiting.value self.process_instance_model.status = ProcessInstanceStatus.waiting.value
db.session.add(self.process_instance_model) db.session.add(self.process_instance_model)
self.add_event_to_process_instance(self.process_instance_model,
ProcessInstanceEventType.process_instance_resumed.value)
db.session.commit() db.session.commit()
@classmethod
def add_event_to_process_instance(cls, process_instance: ProcessInstanceModel, event_type: str, task_guid: Optional[str] = None) -> None:
user_id = None
if g.user:
user_id = g.user.id
process_instance_event = ProcessInstanceEventModel(
process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id)
if task_guid:
process_instance_event.task_guid = task_guid
db.session.add(process_instance_event)

View File

@ -17,6 +17,7 @@ from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel, MessageInstanceCorrelationRuleModel,
) )
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.assertion_service import safe_assertion
@ -63,12 +64,14 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.secondary_engine_step_delegate = secondary_engine_step_delegate self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
self.serializer = serializer
self.current_task_model: Optional[TaskModel] = None self.current_task_model: Optional[TaskModel] = None
self.current_task_start_in_seconds: Optional[float] = None self.current_task_start_in_seconds: Optional[float] = None
self.task_models: dict[str, TaskModel] = {} self.task_models: dict[str, TaskModel] = {}
self.json_data_dicts: dict[str, JsonDataDict] = {} self.json_data_dicts: dict[str, JsonDataDict] = {}
self.serializer = serializer self.process_instance_events: dict[str, ProcessInstanceEventModel] = {}
def will_complete_task(self, spiff_task: SpiffTask) -> None: def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model(): if self._should_update_task_model():
@ -90,9 +93,10 @@ class TaskModelSavingDelegate(EngineStepDelegate):
script_engine = bpmn_process_instance.script_engine script_engine = bpmn_process_instance.script_engine
if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None: if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None:
failing_spiff_task = script_engine.failing_spiff_task failing_spiff_task = script_engine.failing_spiff_task
self._update_task_model_with_spiff_task(failing_spiff_task) self._update_task_model_with_spiff_task(failing_spiff_task, task_failed=True)
db.session.bulk_save_objects(self.task_models.values()) db.session.bulk_save_objects(self.task_models.values())
db.session.bulk_save_objects(self.process_instance_events.values())
TaskService.insert_or_update_json_data_records(self.json_data_dicts) TaskService.insert_or_update_json_data_records(self.json_data_dicts)
@ -121,7 +125,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if json_data_dict is not None: if json_data_dict is not None:
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask) -> TaskModel: def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask, task_failed: bool = False) -> TaskModel:
bpmn_process, task_model, new_task_models, new_json_data_dicts = ( bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, spiff_task,
@ -141,6 +145,18 @@ class TaskModelSavingDelegate(EngineStepDelegate):
json_data_dict_list.append(bpmn_process_json_data) json_data_dict_list.append(bpmn_process_json_data)
self._update_json_data_dicts_using_list(json_data_dict_list) self._update_json_data_dicts_using_list(json_data_dict_list)
if task_model.state == "COMPLETED" or task_failed:
event_type = "task_completed"
if task_failed:
event_type = "task_errored"
# FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete
# which script tasks execute when READY.
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
process_instance_event = ProcessInstanceEventModel(
task_guid=task_model.guid, process_instance_id=self.process_instance.id, event_type=event_type, timestamp=timestamp)
self.process_instance_events[task_model.guid] = process_instance_event
return task_model return task_model

View File

@ -64,8 +64,8 @@ class TestLoggingService(BaseTest):
for log in logs: for log in logs:
assert log["process_instance_id"] == process_instance.id assert log["process_instance_id"] == process_instance.id
for key in [ for key in [
"start_in_seconds", "event_type",
"end_in_seconds", "timestamp",
"spiff_task_guid", "spiff_task_guid",
"bpmn_process_definition_identifier", "bpmn_process_definition_identifier",
"bpmn_process_definition_name", "bpmn_process_definition_name",

View File

@ -85,6 +85,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
tableRow.push( tableRow.push(
<> <>
<td>{row.bpmn_task_type}</td> <td>{row.bpmn_task_type}</td>
<td>{row.event_type}</td>
<td> <td>
{row.username || ( {row.username || (
<span className="system-user-log-entry">system</span> <span className="system-user-log-entry">system</span>
@ -99,7 +100,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
data-qa="process-instance-show-link" data-qa="process-instance-show-link"
to={`${processInstanceShowPageBaseUrl}/${row.process_instance_id}/${row.spiff_step}`} to={`${processInstanceShowPageBaseUrl}/${row.process_instance_id}/${row.spiff_step}`}
> >
{convertSecondsToFormattedDateTime(row.end_in_seconds)} {convertSecondsToFormattedDateTime(row.timestamp)}
</Link> </Link>
</td> </td>
); );
@ -132,6 +133,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
tableHeaders.push( tableHeaders.push(
<> <>
<th>Task Type</th> <th>Task Type</th>
<th>Event</th>
<th>User</th> <th>User</th>
</> </>
); );
@ -177,7 +179,7 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
setSearchParams(searchParams); setSearchParams(searchParams);
}} }}
> >
Simple Milestones
</Tab> </Tab>
<Tab <Tab
title="Show all logs for this process instance, and show extra columns that may be useful for debugging" title="Show all logs for this process instance, and show extra columns that may be useful for debugging"
@ -187,8 +189,18 @@ export default function ProcessInstanceLogList({ variant }: OwnProps) {
setSearchParams(searchParams); setSearchParams(searchParams);
}} }}
> >
Detailed Events
</Tab> </Tab>
{/*
Suspend
Resumed
Terminated
Skipped?
Rewind?
Execute?
Edit?
*/}
</TabList> </TabList>
</Tabs> </Tabs>
<br /> <br />

View File

@ -381,6 +381,40 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
</Column> </Column>
</Grid> </Grid>
{lastUpdatedTimeTag} {lastUpdatedTimeTag}
{/*
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Suspended at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:12:05 (by jason)
</Column>
</Grid>
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Resumed at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:13:05 (by jason)
</Column>
</Grid>
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Suspended at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:14:05 (by jason)
</Column>
</Grid>
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Terminated at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:15:05 (by jason)
</Column>
</Grid>
*/}
<Grid condensed fullWidth> <Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title"> <Column sm={1} md={1} lg={2} className="grid-list-title">
Process model revision:{' '} Process model revision:{' '}
@ -400,6 +434,43 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
</Tag> </Tag>
</Column> </Column>
</Grid> </Grid>
{/*
<br />
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Suspended at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:12:05 (by jason)
</Column>
</Grid>
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Resumed at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:13:05 (by jason)
</Column>
</Grid>
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Suspended at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:14:05 (by jason)
</Column>
</Grid>
<Grid condensed fullWidth>
<Column sm={1} md={1} lg={2} className="grid-list-title">
Terminated at:{' '}
</Column>
<Column sm={3} md={3} lg={3} className="grid-date">
2023-03-17 10:15:05 (by jason)
</Column>
</Grid>
*/}
<br /> <br />
<Grid condensed fullWidth> <Grid condensed fullWidth>
<Column sm={2} md={2} lg={2}> <Column sm={2} md={2} lg={2}>