Strategy based engine step execution (#168)

This commit is contained in:
jbirddog 2023-03-09 11:38:18 -05:00 committed by GitHub
parent 2464ad9a26
commit 4ce715fec8
2 changed files with 312 additions and 83 deletions

View File

@ -44,8 +44,7 @@ from SpiffWorkflow.bpmn.specs.SubWorkflowTask import SubWorkflowTask # type: ig
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore
from SpiffWorkflow.dmn.serializer.task_spec import BusinessRuleTaskConverter # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.exceptions import WorkflowException # type: ignore
from SpiffWorkflow.exceptions import WorkflowTaskException
from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore
from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore
@ -95,6 +94,15 @@ from spiffworkflow_backend.services.process_model_service import ProcessModelSer
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.user_service import UserService
from spiffworkflow_backend.services.workflow_execution_service import (
execution_strategy_named,
)
from spiffworkflow_backend.services.workflow_execution_service import (
StepDetailLoggingDelegate,
)
from spiffworkflow_backend.services.workflow_execution_service import (
WorkflowExecutionService,
)
SPIFF_SPEC_CONFIG["task_specs"].append(BusinessRuleTaskConverter)
@ -1666,90 +1674,34 @@ class ProcessInstanceProcessor:
current_app.config["THREAD_LOCAL_DATA"].spiff_step = spiff_step
db.session.add(self.process_instance_model)
# TODO remove after done with the performance improvements
# to use delete the _ prefix here and add it to the real def below
def _do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps."""
import cProfile
from pstats import SortKey
with cProfile.Profile() as pr:
self._do_engine_steps(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE)
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
def do_engine_steps(
self,
exit_at: None = None,
save: bool = False,
execution_strategy_name: str = "greedy",
) -> None:
"""Do_engine_steps."""
step_details = []
tasks_to_log = {
"BPMN Task",
"Script Task",
"Service Task",
"Default Start Event",
"Exclusive Gateway",
"Call Activity",
# "End Join",
"End Event",
"Default Throwing Event",
"Subprocess",
"Transactional Subprocess",
}
def spiff_step_details_mapping_builder(
task: SpiffTask, start: float, end: float
) -> dict:
self._script_engine.environment.revise_state_with_task_data(task)
return self.spiff_step_details_mapping(task, start, end)
# making a dictionary to ensure we are not shadowing variables in the other methods
current_task_start_in_seconds = {}
def should_log(task: SpiffTask) -> bool:
if (
task.task_spec.spec_type in tasks_to_log
and not task.task_spec.name.endswith(".EndJoin")
):
return True
return False
def will_complete_task(task: SpiffTask) -> None:
if should_log(task):
current_task_start_in_seconds["time"] = time.time()
self.increment_spiff_step()
def did_complete_task(task: SpiffTask) -> None:
if should_log(task):
self._script_engine.environment.revise_state_with_task_data(task)
step_details.append(
self.spiff_step_details_mapping(
task, current_task_start_in_seconds["time"], time.time()
)
)
try:
self.bpmn_process_instance.refresh_waiting_tasks()
self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at,
will_complete_task=will_complete_task,
did_complete_task=did_complete_task,
)
if self.bpmn_process_instance.is_completed():
self._script_engine.environment.finalize_result(
self.bpmn_process_instance
)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except SpiffWorkflowException as swe:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally:
# self.log_spiff_step_details(step_details)
db.session.bulk_insert_mappings(SpiffStepDetailsModel, step_details)
spiff_logger = logging.getLogger("spiff")
for handler in spiff_logger.handlers:
if hasattr(handler, "bulk_insert_logs"):
handler.bulk_insert_logs() # type: ignore
db.session.commit()
if save:
self.save()
step_delegate = StepDetailLoggingDelegate(
self.increment_spiff_step, spiff_step_details_mapping_builder
)
execution_strategy = execution_strategy_named(
execution_strategy_name, step_delegate
)
execution_service = WorkflowExecutionService(
self.bpmn_process_instance,
self.process_instance_model,
execution_strategy,
self._script_engine.environment.finalize_result,
self.save,
)
execution_service.do_engine_steps(exit_at, save)
# log the spiff step details so we know what is processing the process
# instance when a human task has a timer event.

View File

@ -0,0 +1,277 @@
import logging
import time
from typing import Callable
from typing import List
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
class EngineStepDelegate:
"""Interface of sorts for a concrete engine step delegate."""
def will_complete_task(self, task: SpiffTask) -> None:
pass
def did_complete_task(self, task: SpiffTask) -> None:
pass
def save(self) -> None:
pass
SpiffStepIncrementer = Callable[[], None]
SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict]
class StepDetailLoggingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of logging spiff step details.
This separates the concerns of step execution and step logging.
"""
def __init__(
self,
increment_spiff_step: SpiffStepIncrementer,
spiff_step_details_mapping: SpiffStepDetailsMappingBuilder,
):
"""__init__."""
self.increment_spiff_step = increment_spiff_step
self.spiff_step_details_mapping = spiff_step_details_mapping
self.step_details: List[dict] = []
self.current_task_start_in_seconds = 0.0
self.tasks_to_log = {
"BPMN Task",
"Script Task",
"Service Task",
"Default Start Event",
"Exclusive Gateway",
"Call Activity",
# "End Join",
"End Event",
"Default Throwing Event",
"Subprocess",
"Transactional Subprocess",
}
def should_log(self, task: SpiffTask) -> bool:
return (
task.task_spec.spec_type in self.tasks_to_log
and not task.task_spec.name.endswith(".EndJoin")
)
def will_complete_task(self, task: SpiffTask) -> None:
if self.should_log(task):
self.current_task_start_in_seconds = time.time()
self.increment_spiff_step()
def did_complete_task(self, task: SpiffTask) -> None:
if self.should_log(task):
self.step_details.append(
self.spiff_step_details_mapping(
task, self.current_task_start_in_seconds, time.time()
)
)
def save(self) -> None:
db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details)
db.session.commit()
class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""
def __init__(self, delegate: EngineStepDelegate):
"""__init__."""
self.delegate = delegate
def do_engine_steps(
self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None
) -> None:
pass
def save(self) -> None:
self.delegate.save()
class GreedyExecutionStrategy(ExecutionStrategy):
"""The common execution strategy. This will greedily run all engine step without stopping."""
def do_engine_steps(
self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None
) -> None:
bpmn_process_instance.do_engine_steps(
exit_at=exit_at,
will_complete_task=self.delegate.will_complete_task,
did_complete_task=self.delegate.did_complete_task,
)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
"""For illustration purposes, not currently integrated.
Would allow the `run` from the UI to execute until a service task then
return (to an interstitial page). The background processor would then take over.
"""
def do_engine_steps(
self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None
) -> None:
engine_steps = list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
while engine_steps:
for task in engine_steps:
if task.task_spec.spec_type == "Service Task":
return
self.delegate.will_complete_task(task)
task.complete()
self.delegate.did_complete_task(task)
engine_steps = list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
def execution_strategy_named(
name: str, delegate: EngineStepDelegate
) -> ExecutionStrategy:
cls = {
"greedy": GreedyExecutionStrategy,
"run_until_service_task": RunUntilServiceTaskExecutionStrategy,
}[name]
return cls(delegate)
ProcessInstanceCompleter = Callable[[BpmnWorkflow], None]
ProcessInstanceSaver = Callable[[], None]
class WorkflowExecutionService:
"""Provides the driver code for workflow execution."""
def __init__(
self,
bpmn_process_instance: BpmnWorkflow,
process_instance_model: ProcessInstanceModel,
execution_strategy: ExecutionStrategy,
process_instance_completer: ProcessInstanceCompleter,
process_instance_saver: ProcessInstanceSaver,
):
"""__init__."""
self.bpmn_process_instance = bpmn_process_instance
self.process_instance_model = process_instance_model
self.execution_strategy = execution_strategy
self.process_instance_completer = process_instance_completer
self.process_instance_saver = process_instance_saver
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps."""
try:
self.bpmn_process_instance.refresh_waiting_tasks()
self.execution_strategy.do_engine_steps(self.bpmn_process_instance, exit_at)
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except SpiffWorkflowException as swe:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally:
self.execution_strategy.save()
spiff_logger = logging.getLogger("spiff")
for handler in spiff_logger.handlers:
if hasattr(handler, "bulk_insert_logs"):
handler.bulk_insert_logs() # type: ignore
db.session.commit()
if save:
self.process_instance_saver()
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
for bpmn_message in bpmn_messages:
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
user_id=self.process_instance_model.process_initiator_id, # TODO: use the correct swimlane user when that is set up
message_type="send",
name=bpmn_message.name,
payload=bpmn_message.payload,
correlation_keys=self.bpmn_process_instance.correlations,
)
db.session.add(message_instance)
db.session.commit()
def queue_waiting_receive_messages(self) -> None:
"""Queue_waiting_receive_messages."""
waiting_events = self.bpmn_process_instance.waiting_events()
waiting_message_events = filter(
lambda e: e["event_type"] == "Message", waiting_events
)
for event in waiting_message_events:
# Ensure we are only creating one message instance for each waiting message
if (
MessageInstanceModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
message_type="receive",
name=event["name"],
).count()
> 0
):
continue
# Create a new Message Instance
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
user_id=self.process_instance_model.process_initiator_id,
message_type="receive",
name=event["name"],
correlation_keys=self.bpmn_process_instance.correlations,
)
for correlation_property in event["value"]:
message_correlation = MessageInstanceCorrelationRuleModel(
message_instance_id=message_instance.id,
name=correlation_property.name,
retrieval_expression=correlation_property.retrieval_expression,
)
message_instance.correlation_rules.append(message_correlation)
db.session.add(message_instance)
db.session.commit()
class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service."""
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps."""
import cProfile
from pstats import SortKey
with cProfile.Profile() as pr:
super().do_engine_steps(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE)