diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 67c786b13..30f8a481c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -44,8 +44,7 @@ from SpiffWorkflow.bpmn.specs.SubWorkflowTask import SubWorkflowTask # type: ig from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore from SpiffWorkflow.dmn.serializer.task_spec import BusinessRuleTaskConverter # type: ignore -from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore -from SpiffWorkflow.exceptions import WorkflowException +from SpiffWorkflow.exceptions import WorkflowException # type: ignore from SpiffWorkflow.exceptions import WorkflowTaskException from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore @@ -95,6 +94,15 @@ from spiffworkflow_backend.services.process_model_service import ProcessModelSer from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate from spiffworkflow_backend.services.spec_file_service import SpecFileService from spiffworkflow_backend.services.user_service import UserService +from spiffworkflow_backend.services.workflow_execution_service import ( + execution_strategy_named, +) +from spiffworkflow_backend.services.workflow_execution_service import ( + StepDetailLoggingDelegate, +) +from spiffworkflow_backend.services.workflow_execution_service import ( + WorkflowExecutionService, +) SPIFF_SPEC_CONFIG["task_specs"].append(BusinessRuleTaskConverter) @@ -1666,90 +1674,34 @@ class ProcessInstanceProcessor: current_app.config["THREAD_LOCAL_DATA"].spiff_step = spiff_step db.session.add(self.process_instance_model) - # TODO remove after done with the performance improvements - # to use delete the _ prefix here and add it to the real def below - def _do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: - """__do_engine_steps.""" - import cProfile - from pstats import SortKey - - with cProfile.Profile() as pr: - self._do_engine_steps(exit_at=exit_at, save=save) - pr.print_stats(sort=SortKey.CUMULATIVE) - - def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: + def do_engine_steps( + self, + exit_at: None = None, + save: bool = False, + execution_strategy_name: str = "greedy", + ) -> None: """Do_engine_steps.""" - step_details = [] - tasks_to_log = { - "BPMN Task", - "Script Task", - "Service Task", - "Default Start Event", - "Exclusive Gateway", - "Call Activity", - # "End Join", - "End Event", - "Default Throwing Event", - "Subprocess", - "Transactional Subprocess", - } + def spiff_step_details_mapping_builder( + task: SpiffTask, start: float, end: float + ) -> dict: + self._script_engine.environment.revise_state_with_task_data(task) + return self.spiff_step_details_mapping(task, start, end) - # making a dictionary to ensure we are not shadowing variables in the other methods - current_task_start_in_seconds = {} - - def should_log(task: SpiffTask) -> bool: - if ( - task.task_spec.spec_type in tasks_to_log - and not task.task_spec.name.endswith(".EndJoin") - ): - return True - return False - - def will_complete_task(task: SpiffTask) -> None: - if should_log(task): - current_task_start_in_seconds["time"] = time.time() - self.increment_spiff_step() - - def did_complete_task(task: SpiffTask) -> None: - if should_log(task): - self._script_engine.environment.revise_state_with_task_data(task) - step_details.append( - self.spiff_step_details_mapping( - task, current_task_start_in_seconds["time"], time.time() - ) - ) - - try: - self.bpmn_process_instance.refresh_waiting_tasks() - - self.bpmn_process_instance.do_engine_steps( - exit_at=exit_at, - will_complete_task=will_complete_task, - did_complete_task=did_complete_task, - ) - - if self.bpmn_process_instance.is_completed(): - self._script_engine.environment.finalize_result( - self.bpmn_process_instance - ) - - self.process_bpmn_messages() - self.queue_waiting_receive_messages() - except SpiffWorkflowException as swe: - raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe - - finally: - # self.log_spiff_step_details(step_details) - db.session.bulk_insert_mappings(SpiffStepDetailsModel, step_details) - spiff_logger = logging.getLogger("spiff") - for handler in spiff_logger.handlers: - if hasattr(handler, "bulk_insert_logs"): - handler.bulk_insert_logs() # type: ignore - db.session.commit() - - if save: - self.save() + step_delegate = StepDetailLoggingDelegate( + self.increment_spiff_step, spiff_step_details_mapping_builder + ) + execution_strategy = execution_strategy_named( + execution_strategy_name, step_delegate + ) + execution_service = WorkflowExecutionService( + self.bpmn_process_instance, + self.process_instance_model, + execution_strategy, + self._script_engine.environment.finalize_result, + self.save, + ) + execution_service.do_engine_steps(exit_at, save) # log the spiff step details so we know what is processing the process # instance when a human task has a timer event. diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py new file mode 100644 index 000000000..576dee1ba --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -0,0 +1,277 @@ +import logging +import time +from typing import Callable +from typing import List + +from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore +from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore +from SpiffWorkflow.task import Task as SpiffTask # type: ignore +from SpiffWorkflow.task import TaskState + +from spiffworkflow_backend.exceptions.api_error import ApiError +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.message_instance import MessageInstanceModel +from spiffworkflow_backend.models.message_instance_correlation import ( + MessageInstanceCorrelationRuleModel, +) +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel + + +class EngineStepDelegate: + """Interface of sorts for a concrete engine step delegate.""" + + def will_complete_task(self, task: SpiffTask) -> None: + pass + + def did_complete_task(self, task: SpiffTask) -> None: + pass + + def save(self) -> None: + pass + + +SpiffStepIncrementer = Callable[[], None] +SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict] + + +class StepDetailLoggingDelegate(EngineStepDelegate): + """Engine step delegate that takes care of logging spiff step details. + + This separates the concerns of step execution and step logging. + """ + + def __init__( + self, + increment_spiff_step: SpiffStepIncrementer, + spiff_step_details_mapping: SpiffStepDetailsMappingBuilder, + ): + """__init__.""" + self.increment_spiff_step = increment_spiff_step + self.spiff_step_details_mapping = spiff_step_details_mapping + self.step_details: List[dict] = [] + self.current_task_start_in_seconds = 0.0 + self.tasks_to_log = { + "BPMN Task", + "Script Task", + "Service Task", + "Default Start Event", + "Exclusive Gateway", + "Call Activity", + # "End Join", + "End Event", + "Default Throwing Event", + "Subprocess", + "Transactional Subprocess", + } + + def should_log(self, task: SpiffTask) -> bool: + return ( + task.task_spec.spec_type in self.tasks_to_log + and not task.task_spec.name.endswith(".EndJoin") + ) + + def will_complete_task(self, task: SpiffTask) -> None: + if self.should_log(task): + self.current_task_start_in_seconds = time.time() + self.increment_spiff_step() + + def did_complete_task(self, task: SpiffTask) -> None: + if self.should_log(task): + self.step_details.append( + self.spiff_step_details_mapping( + task, self.current_task_start_in_seconds, time.time() + ) + ) + + def save(self) -> None: + db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details) + db.session.commit() + + +class ExecutionStrategy: + """Interface of sorts for a concrete execution strategy.""" + + def __init__(self, delegate: EngineStepDelegate): + """__init__.""" + self.delegate = delegate + + def do_engine_steps( + self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None + ) -> None: + pass + + def save(self) -> None: + self.delegate.save() + + +class GreedyExecutionStrategy(ExecutionStrategy): + """The common execution strategy. This will greedily run all engine step without stopping.""" + + def do_engine_steps( + self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None + ) -> None: + bpmn_process_instance.do_engine_steps( + exit_at=exit_at, + will_complete_task=self.delegate.will_complete_task, + did_complete_task=self.delegate.did_complete_task, + ) + + +class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): + """For illustration purposes, not currently integrated. + + Would allow the `run` from the UI to execute until a service task then + return (to an interstitial page). The background processor would then take over. + """ + + def do_engine_steps( + self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None + ) -> None: + engine_steps = list( + [ + t + for t in bpmn_process_instance.get_tasks(TaskState.READY) + if bpmn_process_instance._is_engine_task(t.task_spec) + ] + ) + while engine_steps: + for task in engine_steps: + if task.task_spec.spec_type == "Service Task": + return + self.delegate.will_complete_task(task) + task.complete() + self.delegate.did_complete_task(task) + + engine_steps = list( + [ + t + for t in bpmn_process_instance.get_tasks(TaskState.READY) + if bpmn_process_instance._is_engine_task(t.task_spec) + ] + ) + + +def execution_strategy_named( + name: str, delegate: EngineStepDelegate +) -> ExecutionStrategy: + cls = { + "greedy": GreedyExecutionStrategy, + "run_until_service_task": RunUntilServiceTaskExecutionStrategy, + }[name] + + return cls(delegate) + + +ProcessInstanceCompleter = Callable[[BpmnWorkflow], None] +ProcessInstanceSaver = Callable[[], None] + + +class WorkflowExecutionService: + """Provides the driver code for workflow execution.""" + + def __init__( + self, + bpmn_process_instance: BpmnWorkflow, + process_instance_model: ProcessInstanceModel, + execution_strategy: ExecutionStrategy, + process_instance_completer: ProcessInstanceCompleter, + process_instance_saver: ProcessInstanceSaver, + ): + """__init__.""" + self.bpmn_process_instance = bpmn_process_instance + self.process_instance_model = process_instance_model + self.execution_strategy = execution_strategy + self.process_instance_completer = process_instance_completer + self.process_instance_saver = process_instance_saver + + def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: + """Do_engine_steps.""" + try: + self.bpmn_process_instance.refresh_waiting_tasks() + + self.execution_strategy.do_engine_steps(self.bpmn_process_instance, exit_at) + + if self.bpmn_process_instance.is_completed(): + self.process_instance_completer(self.bpmn_process_instance) + + self.process_bpmn_messages() + self.queue_waiting_receive_messages() + except SpiffWorkflowException as swe: + raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe + + finally: + self.execution_strategy.save() + spiff_logger = logging.getLogger("spiff") + for handler in spiff_logger.handlers: + if hasattr(handler, "bulk_insert_logs"): + handler.bulk_insert_logs() # type: ignore + db.session.commit() + + if save: + self.process_instance_saver() + + def process_bpmn_messages(self) -> None: + """Process_bpmn_messages.""" + bpmn_messages = self.bpmn_process_instance.get_bpmn_messages() + for bpmn_message in bpmn_messages: + message_instance = MessageInstanceModel( + process_instance_id=self.process_instance_model.id, + user_id=self.process_instance_model.process_initiator_id, # TODO: use the correct swimlane user when that is set up + message_type="send", + name=bpmn_message.name, + payload=bpmn_message.payload, + correlation_keys=self.bpmn_process_instance.correlations, + ) + db.session.add(message_instance) + db.session.commit() + + def queue_waiting_receive_messages(self) -> None: + """Queue_waiting_receive_messages.""" + waiting_events = self.bpmn_process_instance.waiting_events() + waiting_message_events = filter( + lambda e: e["event_type"] == "Message", waiting_events + ) + + for event in waiting_message_events: + # Ensure we are only creating one message instance for each waiting message + if ( + MessageInstanceModel.query.filter_by( + process_instance_id=self.process_instance_model.id, + message_type="receive", + name=event["name"], + ).count() + > 0 + ): + continue + + # Create a new Message Instance + message_instance = MessageInstanceModel( + process_instance_id=self.process_instance_model.id, + user_id=self.process_instance_model.process_initiator_id, + message_type="receive", + name=event["name"], + correlation_keys=self.bpmn_process_instance.correlations, + ) + for correlation_property in event["value"]: + message_correlation = MessageInstanceCorrelationRuleModel( + message_instance_id=message_instance.id, + name=correlation_property.name, + retrieval_expression=correlation_property.retrieval_expression, + ) + message_instance.correlation_rules.append(message_correlation) + db.session.add(message_instance) + db.session.commit() + + +class ProfiledWorkflowExecutionService(WorkflowExecutionService): + """A profiled version of the workflow execution service.""" + + def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: + """__do_engine_steps.""" + import cProfile + from pstats import SortKey + + with cProfile.Profile() as pr: + super().do_engine_steps(exit_at=exit_at, save=save) + pr.print_stats(sort=SortKey.CUMULATIVE)