some initial code to use tasks for logs

This commit is contained in:
jasquat 2023-03-16 09:30:25 -04:00
parent 4ae54958c9
commit bbdac3c586
10 changed files with 156 additions and 92 deletions

View File

@ -1,3 +1,5 @@
from __future__ import with_statement
import logging
from logging.config import fileConfig

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 434e6494e8ff
Revision ID: 077a27ef1246
Revises:
Create Date: 2023-03-15 12:25:48.665481
Create Date: 2023-03-15 16:36:23.278887
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '434e6494e8ff'
revision = '077a27ef1246'
down_revision = None
branch_labels = None
depends_on = None
@ -235,31 +235,6 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('username', 'group_id', name='user_group_assignment_staged_unique')
)
op.create_table('human_task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('lane_assignment_id', sa.Integer(), nullable=True),
sa.Column('completed_by_user_id', sa.Integer(), nullable=True),
sa.Column('actual_owner_id', sa.Integer(), nullable=True),
sa.Column('form_file_name', sa.String(length=50), nullable=True),
sa.Column('ui_form_file_name', sa.String(length=50), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('task_id', sa.String(length=50), nullable=True),
sa.Column('task_name', sa.String(length=255), nullable=True),
sa.Column('task_title', sa.String(length=50), nullable=True),
sa.Column('task_type', sa.String(length=50), nullable=True),
sa.Column('task_status', sa.String(length=50), nullable=True),
sa.Column('process_model_display_name', sa.String(length=255), nullable=True),
sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=True),
sa.Column('completed', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(['actual_owner_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['completed_by_user_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['lane_assignment_id'], ['group.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_human_task_completed'), 'human_task', ['completed'], unique=False)
op.create_table('message_instance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=True),
@ -367,17 +342,33 @@ def upgrade():
op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True)
op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False)
op.create_index(op.f('ix_task_python_env_data_hash'), 'task', ['python_env_data_hash'], unique=False)
op.create_table('human_task_user',
op.create_table('human_task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('human_task_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['human_task_id'], ['human_task.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('human_task_id', 'user_id', name='human_task_user_unique')
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('lane_assignment_id', sa.Integer(), nullable=True),
sa.Column('completed_by_user_id', sa.Integer(), nullable=True),
sa.Column('actual_owner_id', sa.Integer(), nullable=True),
sa.Column('form_file_name', sa.String(length=50), nullable=True),
sa.Column('ui_form_file_name', sa.String(length=50), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('task_model_id', sa.Integer(), nullable=True),
sa.Column('task_id', sa.String(length=50), nullable=True),
sa.Column('task_name', sa.String(length=255), nullable=True),
sa.Column('task_title', sa.String(length=50), nullable=True),
sa.Column('task_type', sa.String(length=50), nullable=True),
sa.Column('task_status', sa.String(length=50), nullable=True),
sa.Column('process_model_display_name', sa.String(length=255), nullable=True),
sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=True),
sa.Column('completed', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(['actual_owner_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['completed_by_user_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['lane_assignment_id'], ['group.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.ForeignKeyConstraint(['task_model_id'], ['task.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_human_task_user_human_task_id'), 'human_task_user', ['human_task_id'], unique=False)
op.create_index(op.f('ix_human_task_user_user_id'), 'human_task_user', ['user_id'], unique=False)
op.create_index(op.f('ix_human_task_completed'), 'human_task', ['completed'], unique=False)
op.create_table('message_instance_correlation_rule',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('message_instance_id', sa.Integer(), nullable=False),
@ -390,16 +381,29 @@ def upgrade():
sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique')
)
op.create_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), 'message_instance_correlation_rule', ['message_instance_id'], unique=False)
op.create_table('human_task_user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('human_task_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['human_task_id'], ['human_task.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('human_task_id', 'user_id', name='human_task_user_unique')
)
op.create_index(op.f('ix_human_task_user_human_task_id'), 'human_task_user', ['human_task_id'], unique=False)
op.create_index(op.f('ix_human_task_user_user_id'), 'human_task_user', ['user_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), table_name='message_instance_correlation_rule')
op.drop_table('message_instance_correlation_rule')
op.drop_index(op.f('ix_human_task_user_user_id'), table_name='human_task_user')
op.drop_index(op.f('ix_human_task_user_human_task_id'), table_name='human_task_user')
op.drop_table('human_task_user')
op.drop_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), table_name='message_instance_correlation_rule')
op.drop_table('message_instance_correlation_rule')
op.drop_index(op.f('ix_human_task_completed'), table_name='human_task')
op.drop_table('human_task')
op.drop_index(op.f('ix_task_python_env_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_json_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_guid'), table_name='task')
@ -416,8 +420,6 @@ def downgrade():
op.drop_table('process_instance_file_data')
op.drop_table('permission_assignment')
op.drop_table('message_instance')
op.drop_index(op.f('ix_human_task_completed'), table_name='human_task')
op.drop_table('human_task')
op.drop_table('user_group_assignment_waiting')
op.drop_table('user_group_assignment')
op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition')

View File

@ -11,7 +11,7 @@ from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import Task, TaskModel
from spiffworkflow_backend.models.user import UserModel
@ -43,6 +43,8 @@ class HumanTaskModel(SpiffworkflowBaseDBModel):
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
# task_id came first which is why it's a string and task_model_id is the int and foreignkey
task_model_id: int = db.Column(ForeignKey(TaskModel.id), nullable=True) # type: ignore
task_id: str = db.Column(db.String(50))
task_name: str = db.Column(db.String(255))
task_title: str = db.Column(db.String(50))

View File

@ -19,6 +19,10 @@ from spiffworkflow_backend.models.json_data import JsonDataModel
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
class TaskNotFoundError(Exception):
pass
class MultiInstanceType(enum.Enum):
"""MultiInstanceType."""

View File

@ -42,7 +42,7 @@ from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.models.spec_reference import SpecReferenceNotFoundError
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import Task, TaskModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.process_api_blueprint import (
_find_process_instance_by_id_or_raise,
@ -205,28 +205,40 @@ def process_instance_log_list(
# to make sure the process instance exists
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
log_query = SpiffLoggingModel.query.filter(SpiffLoggingModel.process_instance_id == process_instance.id)
if not detailed:
log_query = log_query.filter(
# 1. this was the previous implementation, where we only show completed tasks and skipped tasks.
# maybe we want to iterate on this in the future (in a third tab under process instance logs?)
# or_(
# SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore
# SpiffLoggingModel.message.like("Skipped task %"), # type: ignore
# )
# 2. We included ["End Event", "Default Start Event"] along with Default Throwing Event, but feb 2023
# we decided to remove them, since they get really chatty when there are lots of subprocesses and call activities.
and_(
SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore
SpiffLoggingModel.bpmn_task_type.in_(["Default Throwing Event"]), # type: ignore
)
)
# log_query = SpiffLoggingModel.query.filter(SpiffLoggingModel.process_instance_id == process_instance.id)
# if not detailed:
# log_query = log_query.filter(
# # 1. this was the previous implementation, where we only show completed tasks and skipped tasks.
# # maybe we want to iterate on this in the future (in a third tab under process instance logs?)
# # or_(
# # SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore
# # SpiffLoggingModel.message.like("Skipped task %"), # type: ignore
# # )
# # 2. We included ["End Event", "Default Start Event"] along with Default Throwing Event, but feb 2023
# # we decided to remove them, since they get really chatty when there are lots of subprocesses and call activities.
# and_(
# SpiffLoggingModel.message.in_(["State change to COMPLETED"]), # type: ignore
# SpiffLoggingModel.bpmn_task_type.in_(["Default Throwing Event"]), # type: ignore
# )
# )
#
# logs = (
# log_query.order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore
# .join(
# UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True
# ) # isouter since if we don't have a user, we still want the log
# .add_columns(
# UserModel.username,
# )
# .paginate(page=page, per_page=per_page, error_out=False)
# )
log_query = TaskModel.query.filter_by(process_instance_id=process_instance.id)
logs = (
log_query.order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore
.join(
UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True
) # isouter since if we don't have a user, we still want the log
log_query.order_by(TaskModel.end_in_seconds.desc()) # type: ignore
.outerjoin(HumanTaskModel, HumanTaskModel.task_model_id == TaskModel.id)
.outerjoin(
UserModel, UserModel.id == HumanTaskModel.completed_by_user_id
)
.add_columns(
UserModel.username,
)

View File

@ -235,6 +235,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
# since the task data is not directly mutated when the script executes, need to determine which keys
# have been deleted from the environment and remove them from task data if present.
context_keys_to_drop = context.keys() - self.state.keys()
# import pdb; pdb.set_trace()
for key_to_drop in context_keys_to_drop:
context.pop(key_to_drop)
@ -1037,6 +1038,10 @@ class ProcessInstanceProcessor:
Expects the save method to commit it.
"""
# if self.process_instance_model.bpmn_process_definition_id is not None:
# return None
# we may have to already process bpmn_defintions if we ever care about the Root task again
bpmn_dict = self.serialize()
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
process_instance_data_dict = {}
@ -1047,9 +1052,8 @@ class ProcessInstanceProcessor:
else:
process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key]
# we may have to already process bpmn_defintions if we ever care about the Root task again
if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict)
# if self.process_instance_model.bpmn_process_definition_id is not None:
self._add_bpmn_process_definitions(bpmn_spec_dict)
subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent, new_task_models, new_json_data_dicts = TaskService.add_bpmn_process(
@ -1144,13 +1148,19 @@ class ProcessInstanceProcessor:
human_tasks.remove(at)
if human_task is None:
task_guid = str(ready_or_waiting_task.id)
task_model = TaskModel.query.filter_by(guid=task_guid).first()
if task_model is None:
raise TaskNotFoundError(f"Could not find task for human task with guid: {task_guid}")
human_task = HumanTaskModel(
process_instance_id=self.process_instance_model.id,
process_model_display_name=process_model_display_name,
bpmn_process_identifier=bpmn_process_identifier,
form_file_name=form_file_name,
ui_form_file_name=ui_form_file_name,
task_id=str(ready_or_waiting_task.id),
task_model_id=task_model.id,
task_id=task_guid,
task_name=ready_or_waiting_task.task_spec.name,
task_title=ready_or_waiting_task.task_spec.description,
task_type=ready_or_waiting_task.task_spec.__class__.__name__,
@ -1536,12 +1546,15 @@ class ProcessInstanceProcessor:
self._script_engine.environment.revise_state_with_task_data(task)
return self.spiff_step_details_mapping(task, start, end)
# self._add_bpmn_json_records()
step_delegate = StepDetailLoggingDelegate(self.increment_spiff_step, spiff_step_details_mapping_builder)
task_model_delegate = TaskModelSavingDelegate(
secondary_engine_step_delegate=step_delegate,
serializer=self._serializer,
process_instance=self.process_instance_model,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
script_engine=self._script_engine,
)
if execution_strategy_name is None:

View File

@ -171,6 +171,13 @@ class TaskService:
tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data_dict = bpmn_process_dict.pop("data")
if 'subprocesses' in bpmn_process_dict:
bpmn_process_dict.pop('subprocesses')
if 'spec' in bpmn_process_dict:
bpmn_process_dict.pop('spec')
if 'subprocess_specs' in bpmn_process_dict:
bpmn_process_dict.pop('subprocess_specs')
new_task_models = {}
new_json_data_dicts: dict[str, JsonDataDict] = {}

View File

@ -58,13 +58,16 @@ class TaskModelSavingDelegate(EngineStepDelegate):
serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
script_engine,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
self.script_engine = script_engine
self.current_task_model: Optional[TaskModel] = None
self.current_task_start_in_seconds: Optional[float] = None
self.task_models: dict[str, TaskModel] = {}
self.json_data_dicts: dict[str, JsonDataDict] = {}
self.serializer = serializer
@ -75,6 +78,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
Use the bpmn_process_id to do this.
"""
return self.process_instance.bpmn_process_id is not None
# return True
def _update_json_data_dicts_using_list(self, json_data_dict_list: list[Optional[JsonDataDict]]) -> None:
for json_data_dict in json_data_dict_list:
@ -83,6 +87,28 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_update_task_model():
# _bpmn_process, task_model, new_task_models, new_json_data_dicts = (
# TaskService.find_or_create_task_model_from_spiff_task(
# spiff_task,
# self.process_instance,
# self.serializer,
# bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
# )
# )
# self.current_task_model = task_model
# self.task_models.update(new_task_models)
# self.json_data_dicts.update(new_json_data_dicts)
# self.current_task_model.start_in_seconds = time.time()
self.current_task_start_in_seconds = time.time()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
def did_complete_task(self, spiff_task: SpiffTask) -> None:
# if self.current_task_model and self.should_update_task_model():
if self.should_update_task_model():
# if spiff_task.task_spec.name == 'top_level_script':
# import pdb; pdb.set_trace()
spiff_task.workflow.script_engine.environment.revise_state_with_task_data(spiff_task)
_bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task(
spiff_task,
@ -91,19 +117,13 @@ class TaskModelSavingDelegate(EngineStepDelegate):
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
)
self.current_task_model = task_model
task_model.start_in_seconds = self.current_task_start_in_seconds or time.time()
task_model.end_in_seconds = time.time()
json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer)
self._update_json_data_dicts_using_list(json_data_dict_list)
self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
self.current_task_model.start_in_seconds = time.time()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time()
json_data_dict_list = TaskService.update_task_model(self.current_task_model, spiff_task, self.serializer)
self._update_json_data_dicts_using_list(json_data_dict_list)
self.task_models[self.current_task_model.guid] = self.current_task_model
self.task_models[task_model.guid] = task_model
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
@ -122,7 +142,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY
):
bpmn_process, task_model, new_task_models, new_json_data_dicts = (
_bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task,
self.process_instance,

View File

@ -59,14 +59,15 @@ class TestLoggingService(BaseTest):
assert log_response.status_code == 200
assert log_response.json
logs: list = log_response.json["results"]
assert len(logs) > 0
for log in logs:
assert log["process_instance_id"] == process_instance_id
for key in [
"timestamp",
"spiff_task_guid",
"bpmn_task_identifier",
"bpmn_process_identifier",
"message",
]:
assert key in log.keys()
assert len(logs) == 8
print(f"logs[0]: {logs[0]}")
# for log in logs:
# assert log["process_instance_id"] == process_instance_id
# for key in [
# "timestamp",
# "spiff_task_guid",
# "bpmn_task_identifier",
# "bpmn_process_identifier",
# "message",
# ]:
# assert key in log.keys()

View File

@ -335,6 +335,7 @@ class TestProcessInstanceProcessor(BaseTest):
# TODO: also check task data here from the spiff_task directly to ensure we hydrated spiff correctly
def assert_spiff_task_is_in_process(spiff_task_name: str, bpmn_process_identifier: str) -> None:
if spiff_task.task_spec.name == spiff_task_name:
base_failure_message = f"Failed on {bpmn_process_identifier} - {spiff_task_name}."
expected_python_env_data = expected_task_data[spiff_task.task_spec.name]
if spiff_task.task_spec.name in spiff_tasks_checked_once:
expected_python_env_data = expected_task_data[f"{spiff_task.task_spec.name}_second"]
@ -343,7 +344,7 @@ class TestProcessInstanceProcessor(BaseTest):
task_definition = task.task_definition
assert task_definition.bpmn_identifier == spiff_task_name
assert task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier
assert task.python_env_data() == expected_python_env_data
assert task.python_env_data() == expected_python_env_data, f"{base_failure_message} Expected: {expected_python_env_data}. Received: {task.python_env_data()}"
spiff_tasks_checked_once.append(spiff_task.task_spec.name)
all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks()