From 5bf37687ae83273d9bf8d7956a3008db5a6b3a8c Mon Sep 17 00:00:00 2001 From: jbirddog <100367399+jbirddog@users.noreply.github.com> Date: Thu, 25 May 2023 10:30:01 -0400 Subject: [PATCH] Custom start event (#274) --- .../services/custom_parser.py | 4 + .../services/process_instance_processor.py | 4 + .../services/process_instance_service.py | 15 +- .../services/workflow_service.py | 39 +++++ .../spiffworkflow_backend/specs/__init__.py | 1 + .../specs/start_event.py | 62 ++++++++ .../unit/test_workflow_service.py | 135 ++++++++++++++++++ 7 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/specs/__init__.py create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py create mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/custom_parser.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/custom_parser.py index c54c195fd..20afce1fc 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/custom_parser.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/custom_parser.py @@ -2,9 +2,13 @@ from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore +from spiffworkflow_backend.specs.start_event import StartEvent + class MyCustomParser(BpmnDmnParser): # type: ignore """A BPMN and DMN parser that can also parse spiffworkflow-specific extensions.""" OVERRIDE_PARSER_CLASSES = BpmnDmnParser.OVERRIDE_PARSER_CLASSES OVERRIDE_PARSER_CLASSES.update(SpiffBpmnParser.OVERRIDE_PARSER_CLASSES) + + StartEvent.register_parser_class(OVERRIDE_PARSER_CLASSES) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 29cbab04a..d713af7d2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -101,7 +101,11 @@ from spiffworkflow_backend.services.workflow_execution_service import ( from spiffworkflow_backend.services.workflow_execution_service import ( WorkflowExecutionService, ) +from spiffworkflow_backend.specs.start_event import ( + StartEvent, +) +StartEvent.register_converter(SPIFF_SPEC_CONFIG) # Sorry about all this crap. I wanted to move this thing to another file, but # importing a bunch of types causes circular imports. diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index ac9b10a3f..a2364a925 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -46,6 +46,7 @@ from spiffworkflow_backend.services.process_instance_queue_service import ( ProcessInstanceQueueService, ) from spiffworkflow_backend.services.process_model_service import ProcessModelService +from spiffworkflow_backend.services.workflow_service import WorkflowService class ProcessInstanceService: @@ -54,6 +55,17 @@ class ProcessInstanceService: FILE_DATA_DIGEST_PREFIX = "spifffiledatadigest+" TASK_STATE_LOCKED = "locked" + @staticmethod + def calculate_start_delay_in_seconds(process_instance_model: ProcessInstanceModel) -> int: + try: + processor = ProcessInstanceProcessor(process_instance_model) + delay_in_seconds = WorkflowService.calculate_run_at_delay_in_seconds( + processor.bpmn_process_instance, datetime.now(timezone.utc) + ) + except Exception: + delay_in_seconds = 0 + return delay_in_seconds + @classmethod def create_process_instance( cls, @@ -77,7 +89,8 @@ class ProcessInstanceService: ) db.session.add(process_instance_model) db.session.commit() - run_at_in_seconds = round(time.time()) + delay_in_seconds = cls.calculate_start_delay_in_seconds(process_instance_model) + run_at_in_seconds = round(time.time()) + delay_in_seconds ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model, run_at_in_seconds) return process_instance_model diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py new file mode 100644 index 000000000..9965484f5 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py @@ -0,0 +1,39 @@ +"""workflow_service.""" +from datetime import datetime + +from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore +from SpiffWorkflow.task import Task as SpiffTask # type: ignore +from SpiffWorkflow.task import TaskState + +from spiffworkflow_backend.specs.start_event import StartEvent + + +class WorkflowService: + """WorkflowService.""" + + @classmethod + def future_start_events(cls, workflow: BpmnWorkflow) -> list[SpiffTask]: + return [t for t in workflow.get_tasks(TaskState.FUTURE) if isinstance(t.task_spec, StartEvent)] + + @classmethod + def next_start_event_delay_in_seconds(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> int: + start_events = cls.future_start_events(workflow) + start_delays: list[int] = [] + for start_event in start_events: + start_delay = start_event.task_spec.start_delay_in_seconds(start_event, now_in_utc) + start_delays.append(start_delay) + start_delays.sort() + return start_delays[0] if len(start_delays) > 0 else 0 + + @classmethod + def calculate_run_at_delay_in_seconds(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> int: + # TODO: for now we are using the first start time because I am not sure how multiple + # start events should work. I think the right answer is to take the earliest start + # time and have later start events stay FUTURE/WAITING?, then we need to be able + # to respect the other start events when enqueue'ing. + # + # TODO: this method should also expand to include other FUTURE/WAITING timers when + # enqueue'ing so that we don't have to check timers every 10 or whatever seconds + # right now we assume that this is being called to create a process + + return cls.next_start_event_delay_in_seconds(workflow, now_in_utc) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/specs/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/specs/__init__.py new file mode 100644 index 000000000..734641eb2 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/specs/__init__.py @@ -0,0 +1 @@ +"""docstring.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py b/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py new file mode 100644 index 000000000..bcd5a14b1 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py @@ -0,0 +1,62 @@ +from datetime import datetime +from typing import Any +from typing import Dict + +from SpiffWorkflow.bpmn.parser.util import full_tag # type: ignore +from SpiffWorkflow.bpmn.serializer.task_spec import EventConverter # type: ignore +from SpiffWorkflow.bpmn.serializer.task_spec import StartEventConverter as DefaultStartEventConverter +from SpiffWorkflow.bpmn.specs.defaults import StartEvent as DefaultStartEvent # type: ignore +from SpiffWorkflow.bpmn.specs.event_definitions import CycleTimerEventDefinition # type: ignore +from SpiffWorkflow.bpmn.specs.event_definitions import DurationTimerEventDefinition +from SpiffWorkflow.bpmn.specs.event_definitions import NoneEventDefinition +from SpiffWorkflow.bpmn.specs.event_definitions import TimeDateEventDefinition +from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition +from SpiffWorkflow.spiff.parser.event_parsers import SpiffStartEventParser # type: ignore +from SpiffWorkflow.task import Task as SpiffTask # type: ignore + + +# TODO: cylce timers and repeat counts? +class StartEvent(DefaultStartEvent): # type: ignore + def __init__(self, wf_spec, bpmn_id, event_definition, **kwargs): # type: ignore + if isinstance(event_definition, TimerEventDefinition): + super().__init__(wf_spec, bpmn_id, NoneEventDefinition(), **kwargs) + self.timer_definition = event_definition + else: + super().__init__(wf_spec, bpmn_id, event_definition, **kwargs) + self.timer_definition = None + + @staticmethod + def register_converter(spec_config: Dict[str, Any]) -> None: + spec_config["task_specs"].remove(DefaultStartEventConverter) + spec_config["task_specs"].append(StartEventConverter) + + @staticmethod + def register_parser_class(parser_config: Dict[str, Any]) -> None: + parser_config[full_tag("startEvent")] = (SpiffStartEventParser, StartEvent) + + def start_delay_in_seconds(self, my_task: SpiffTask, now_in_utc: datetime) -> int: + script_engine = my_task.workflow.script_engine + evaluated_expression = None + parsed_duration = None + + if isinstance(self.timer_definition, TimerEventDefinition) and script_engine is not None: + evaluated_expression = script_engine.evaluate(my_task, self.timer_definition.expression) + + if evaluated_expression is not None: + if isinstance(self.timer_definition, TimeDateEventDefinition): + parsed_duration = TimerEventDefinition.parse_time_or_duration(evaluated_expression) + time_delta = parsed_duration - now_in_utc + return time_delta.seconds # type: ignore + elif isinstance(self.timer_definition, DurationTimerEventDefinition): + parsed_duration = TimerEventDefinition.parse_iso_duration(evaluated_expression) + time_delta = TimerEventDefinition.get_timedelta_from_start(parsed_duration, now_in_utc) + return time_delta.seconds # type: ignore + elif isinstance(self.timer_definition, CycleTimerEventDefinition): + return 0 + + return 0 + + +class StartEventConverter(EventConverter): # type: ignore + def __init__(self, registry): # type: ignore + super().__init__(StartEvent, registry) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py new file mode 100644 index 000000000..c645c9a37 --- /dev/null +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py @@ -0,0 +1,135 @@ +"""Test_workflow_service.""" +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from typing import Generator + +import pytest +from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore +from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore +from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore +from tests.spiffworkflow_backend.helpers.base_test import BaseTest + +from spiffworkflow_backend.services.workflow_service import ( + WorkflowService, +) +from spiffworkflow_backend.specs.start_event import StartEvent + +BPMN_WRAPPER = """ + + {} + +""" + + +@pytest.fixture() +def now_in_utc() -> Generator[datetime, None, None]: + yield datetime.now(timezone.utc) + + +@pytest.fixture() +def example_start_datetime_in_utc_str() -> Generator[str, None, None]: + yield "2019-10-01T12:00:00+00:00" + + +@pytest.fixture() +def example_start_datetime_minus_5_mins_in_utc( + example_start_datetime_in_utc_str: str, +) -> Generator[datetime, None, None]: + example_datetime = datetime.fromisoformat(example_start_datetime_in_utc_str) + yield example_datetime - timedelta(minutes=5) + + +class CustomBpmnDmnParser(BpmnDmnParser): # type: ignore + OVERRIDE_PARSER_CLASSES = {} + OVERRIDE_PARSER_CLASSES.update(BpmnDmnParser.OVERRIDE_PARSER_CLASSES) + OVERRIDE_PARSER_CLASSES.update(SpiffBpmnParser.OVERRIDE_PARSER_CLASSES) + + StartEvent.register_parser_class(OVERRIDE_PARSER_CLASSES) + + +def workflow_from_str(bpmn_str: str, process_id: str) -> BpmnWorkflow: + parser = CustomBpmnDmnParser() + parser.add_bpmn_str(bpmn_str) + top_level = parser.get_spec(process_id) + subprocesses = parser.get_subprocess_specs(process_id) + return BpmnWorkflow(top_level, subprocesses) + + +def workflow_from_fragment(bpmn_fragment: str, process_id: str) -> BpmnWorkflow: + return workflow_from_str(BPMN_WRAPPER.format(bpmn_fragment), process_id) + + +class TestWorkflowService(BaseTest): + """TestWorkflowService.""" + + def test_run_at_delay_is_0_for_regular_start_events(self, now_in_utc: datetime) -> None: + workflow = workflow_from_fragment( + """ + + + Flow_184umot + + + Flow_184umot + + + + """, + "no_tasks", + ) + delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, now_in_utc) + assert delay == 0 + + def test_run_at_delay_is_30_for_30_second_duration_start_timer_event(self, now_in_utc: datetime) -> None: + workflow = workflow_from_fragment( + """ + + + Flow_1x1o335 + + "PT30S" + + + + + Flow_1x1o335 + + + """, + "Process_aldvgey", + ) + delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, now_in_utc) + assert delay == 30 + + def test_run_at_delay_is_300_if_5_mins_before_date_start_timer_event( + self, example_start_datetime_in_utc_str: str, example_start_datetime_minus_5_mins_in_utc: datetime + ) -> None: + workflow = workflow_from_fragment( + f""" + + + Flow_1x1o335 + + "{example_start_datetime_in_utc_str}" + + + + + Flow_1x1o335 + + + """, + "Process_aldvgey", + ) + delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, example_start_datetime_minus_5_mins_in_utc) + assert delay == 300