Custom start event (#274)
This commit is contained in:
parent
4ea129caa7
commit
5bf37687ae
|
@ -2,9 +2,13 @@
|
||||||
from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore
|
from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore
|
||||||
from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore
|
from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore
|
||||||
|
|
||||||
|
from spiffworkflow_backend.specs.start_event import StartEvent
|
||||||
|
|
||||||
|
|
||||||
class MyCustomParser(BpmnDmnParser): # type: ignore
|
class MyCustomParser(BpmnDmnParser): # type: ignore
|
||||||
"""A BPMN and DMN parser that can also parse spiffworkflow-specific extensions."""
|
"""A BPMN and DMN parser that can also parse spiffworkflow-specific extensions."""
|
||||||
|
|
||||||
OVERRIDE_PARSER_CLASSES = BpmnDmnParser.OVERRIDE_PARSER_CLASSES
|
OVERRIDE_PARSER_CLASSES = BpmnDmnParser.OVERRIDE_PARSER_CLASSES
|
||||||
OVERRIDE_PARSER_CLASSES.update(SpiffBpmnParser.OVERRIDE_PARSER_CLASSES)
|
OVERRIDE_PARSER_CLASSES.update(SpiffBpmnParser.OVERRIDE_PARSER_CLASSES)
|
||||||
|
|
||||||
|
StartEvent.register_parser_class(OVERRIDE_PARSER_CLASSES)
|
||||||
|
|
|
@ -101,7 +101,11 @@ from spiffworkflow_backend.services.workflow_execution_service import (
|
||||||
from spiffworkflow_backend.services.workflow_execution_service import (
|
from spiffworkflow_backend.services.workflow_execution_service import (
|
||||||
WorkflowExecutionService,
|
WorkflowExecutionService,
|
||||||
)
|
)
|
||||||
|
from spiffworkflow_backend.specs.start_event import (
|
||||||
|
StartEvent,
|
||||||
|
)
|
||||||
|
|
||||||
|
StartEvent.register_converter(SPIFF_SPEC_CONFIG)
|
||||||
|
|
||||||
# Sorry about all this crap. I wanted to move this thing to another file, but
|
# Sorry about all this crap. I wanted to move this thing to another file, but
|
||||||
# importing a bunch of types causes circular imports.
|
# importing a bunch of types causes circular imports.
|
||||||
|
|
|
@ -46,6 +46,7 @@ from spiffworkflow_backend.services.process_instance_queue_service import (
|
||||||
ProcessInstanceQueueService,
|
ProcessInstanceQueueService,
|
||||||
)
|
)
|
||||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||||
|
from spiffworkflow_backend.services.workflow_service import WorkflowService
|
||||||
|
|
||||||
|
|
||||||
class ProcessInstanceService:
|
class ProcessInstanceService:
|
||||||
|
@ -54,6 +55,17 @@ class ProcessInstanceService:
|
||||||
FILE_DATA_DIGEST_PREFIX = "spifffiledatadigest+"
|
FILE_DATA_DIGEST_PREFIX = "spifffiledatadigest+"
|
||||||
TASK_STATE_LOCKED = "locked"
|
TASK_STATE_LOCKED = "locked"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def calculate_start_delay_in_seconds(process_instance_model: ProcessInstanceModel) -> int:
|
||||||
|
try:
|
||||||
|
processor = ProcessInstanceProcessor(process_instance_model)
|
||||||
|
delay_in_seconds = WorkflowService.calculate_run_at_delay_in_seconds(
|
||||||
|
processor.bpmn_process_instance, datetime.now(timezone.utc)
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
delay_in_seconds = 0
|
||||||
|
return delay_in_seconds
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_process_instance(
|
def create_process_instance(
|
||||||
cls,
|
cls,
|
||||||
|
@ -77,7 +89,8 @@ class ProcessInstanceService:
|
||||||
)
|
)
|
||||||
db.session.add(process_instance_model)
|
db.session.add(process_instance_model)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
run_at_in_seconds = round(time.time())
|
delay_in_seconds = cls.calculate_start_delay_in_seconds(process_instance_model)
|
||||||
|
run_at_in_seconds = round(time.time()) + delay_in_seconds
|
||||||
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model, run_at_in_seconds)
|
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model, run_at_in_seconds)
|
||||||
return process_instance_model
|
return process_instance_model
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
"""workflow_service."""
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
|
||||||
|
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||||
|
from SpiffWorkflow.task import TaskState
|
||||||
|
|
||||||
|
from spiffworkflow_backend.specs.start_event import StartEvent
|
||||||
|
|
||||||
|
|
||||||
|
class WorkflowService:
|
||||||
|
"""WorkflowService."""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def future_start_events(cls, workflow: BpmnWorkflow) -> list[SpiffTask]:
|
||||||
|
return [t for t in workflow.get_tasks(TaskState.FUTURE) if isinstance(t.task_spec, StartEvent)]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def next_start_event_delay_in_seconds(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> int:
|
||||||
|
start_events = cls.future_start_events(workflow)
|
||||||
|
start_delays: list[int] = []
|
||||||
|
for start_event in start_events:
|
||||||
|
start_delay = start_event.task_spec.start_delay_in_seconds(start_event, now_in_utc)
|
||||||
|
start_delays.append(start_delay)
|
||||||
|
start_delays.sort()
|
||||||
|
return start_delays[0] if len(start_delays) > 0 else 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def calculate_run_at_delay_in_seconds(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> int:
|
||||||
|
# TODO: for now we are using the first start time because I am not sure how multiple
|
||||||
|
# start events should work. I think the right answer is to take the earliest start
|
||||||
|
# time and have later start events stay FUTURE/WAITING?, then we need to be able
|
||||||
|
# to respect the other start events when enqueue'ing.
|
||||||
|
#
|
||||||
|
# TODO: this method should also expand to include other FUTURE/WAITING timers when
|
||||||
|
# enqueue'ing so that we don't have to check timers every 10 or whatever seconds
|
||||||
|
# right now we assume that this is being called to create a process
|
||||||
|
|
||||||
|
return cls.next_start_event_delay_in_seconds(workflow, now_in_utc)
|
|
@ -0,0 +1 @@
|
||||||
|
"""docstring."""
|
|
@ -0,0 +1,62 @@
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Any
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from SpiffWorkflow.bpmn.parser.util import full_tag # type: ignore
|
||||||
|
from SpiffWorkflow.bpmn.serializer.task_spec import EventConverter # type: ignore
|
||||||
|
from SpiffWorkflow.bpmn.serializer.task_spec import StartEventConverter as DefaultStartEventConverter
|
||||||
|
from SpiffWorkflow.bpmn.specs.defaults import StartEvent as DefaultStartEvent # type: ignore
|
||||||
|
from SpiffWorkflow.bpmn.specs.event_definitions import CycleTimerEventDefinition # type: ignore
|
||||||
|
from SpiffWorkflow.bpmn.specs.event_definitions import DurationTimerEventDefinition
|
||||||
|
from SpiffWorkflow.bpmn.specs.event_definitions import NoneEventDefinition
|
||||||
|
from SpiffWorkflow.bpmn.specs.event_definitions import TimeDateEventDefinition
|
||||||
|
from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition
|
||||||
|
from SpiffWorkflow.spiff.parser.event_parsers import SpiffStartEventParser # type: ignore
|
||||||
|
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: cylce timers and repeat counts?
|
||||||
|
class StartEvent(DefaultStartEvent): # type: ignore
|
||||||
|
def __init__(self, wf_spec, bpmn_id, event_definition, **kwargs): # type: ignore
|
||||||
|
if isinstance(event_definition, TimerEventDefinition):
|
||||||
|
super().__init__(wf_spec, bpmn_id, NoneEventDefinition(), **kwargs)
|
||||||
|
self.timer_definition = event_definition
|
||||||
|
else:
|
||||||
|
super().__init__(wf_spec, bpmn_id, event_definition, **kwargs)
|
||||||
|
self.timer_definition = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def register_converter(spec_config: Dict[str, Any]) -> None:
|
||||||
|
spec_config["task_specs"].remove(DefaultStartEventConverter)
|
||||||
|
spec_config["task_specs"].append(StartEventConverter)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def register_parser_class(parser_config: Dict[str, Any]) -> None:
|
||||||
|
parser_config[full_tag("startEvent")] = (SpiffStartEventParser, StartEvent)
|
||||||
|
|
||||||
|
def start_delay_in_seconds(self, my_task: SpiffTask, now_in_utc: datetime) -> int:
|
||||||
|
script_engine = my_task.workflow.script_engine
|
||||||
|
evaluated_expression = None
|
||||||
|
parsed_duration = None
|
||||||
|
|
||||||
|
if isinstance(self.timer_definition, TimerEventDefinition) and script_engine is not None:
|
||||||
|
evaluated_expression = script_engine.evaluate(my_task, self.timer_definition.expression)
|
||||||
|
|
||||||
|
if evaluated_expression is not None:
|
||||||
|
if isinstance(self.timer_definition, TimeDateEventDefinition):
|
||||||
|
parsed_duration = TimerEventDefinition.parse_time_or_duration(evaluated_expression)
|
||||||
|
time_delta = parsed_duration - now_in_utc
|
||||||
|
return time_delta.seconds # type: ignore
|
||||||
|
elif isinstance(self.timer_definition, DurationTimerEventDefinition):
|
||||||
|
parsed_duration = TimerEventDefinition.parse_iso_duration(evaluated_expression)
|
||||||
|
time_delta = TimerEventDefinition.get_timedelta_from_start(parsed_duration, now_in_utc)
|
||||||
|
return time_delta.seconds # type: ignore
|
||||||
|
elif isinstance(self.timer_definition, CycleTimerEventDefinition):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
class StartEventConverter(EventConverter): # type: ignore
|
||||||
|
def __init__(self, registry): # type: ignore
|
||||||
|
super().__init__(StartEvent, registry)
|
|
@ -0,0 +1,135 @@
|
||||||
|
"""Test_workflow_service."""
|
||||||
|
from datetime import datetime
|
||||||
|
from datetime import timedelta
|
||||||
|
from datetime import timezone
|
||||||
|
from typing import Generator
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
|
||||||
|
from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore
|
||||||
|
from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore
|
||||||
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||||
|
|
||||||
|
from spiffworkflow_backend.services.workflow_service import (
|
||||||
|
WorkflowService,
|
||||||
|
)
|
||||||
|
from spiffworkflow_backend.specs.start_event import StartEvent
|
||||||
|
|
||||||
|
BPMN_WRAPPER = """
|
||||||
|
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL"
|
||||||
|
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"
|
||||||
|
xmlns:di="http://www.omg.org/spec/DD/20100524/DI"
|
||||||
|
id="Definitions_96f6665"
|
||||||
|
targetNamespace="http://bpmn.io/schema/bpmn"
|
||||||
|
exporter="Camunda Modeler"
|
||||||
|
exporterVersion="3.0.0-dev"
|
||||||
|
>
|
||||||
|
{}
|
||||||
|
</bpmn:definitions>
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def now_in_utc() -> Generator[datetime, None, None]:
|
||||||
|
yield datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def example_start_datetime_in_utc_str() -> Generator[str, None, None]:
|
||||||
|
yield "2019-10-01T12:00:00+00:00"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def example_start_datetime_minus_5_mins_in_utc(
|
||||||
|
example_start_datetime_in_utc_str: str,
|
||||||
|
) -> Generator[datetime, None, None]:
|
||||||
|
example_datetime = datetime.fromisoformat(example_start_datetime_in_utc_str)
|
||||||
|
yield example_datetime - timedelta(minutes=5)
|
||||||
|
|
||||||
|
|
||||||
|
class CustomBpmnDmnParser(BpmnDmnParser): # type: ignore
|
||||||
|
OVERRIDE_PARSER_CLASSES = {}
|
||||||
|
OVERRIDE_PARSER_CLASSES.update(BpmnDmnParser.OVERRIDE_PARSER_CLASSES)
|
||||||
|
OVERRIDE_PARSER_CLASSES.update(SpiffBpmnParser.OVERRIDE_PARSER_CLASSES)
|
||||||
|
|
||||||
|
StartEvent.register_parser_class(OVERRIDE_PARSER_CLASSES)
|
||||||
|
|
||||||
|
|
||||||
|
def workflow_from_str(bpmn_str: str, process_id: str) -> BpmnWorkflow:
|
||||||
|
parser = CustomBpmnDmnParser()
|
||||||
|
parser.add_bpmn_str(bpmn_str)
|
||||||
|
top_level = parser.get_spec(process_id)
|
||||||
|
subprocesses = parser.get_subprocess_specs(process_id)
|
||||||
|
return BpmnWorkflow(top_level, subprocesses)
|
||||||
|
|
||||||
|
|
||||||
|
def workflow_from_fragment(bpmn_fragment: str, process_id: str) -> BpmnWorkflow:
|
||||||
|
return workflow_from_str(BPMN_WRAPPER.format(bpmn_fragment), process_id)
|
||||||
|
|
||||||
|
|
||||||
|
class TestWorkflowService(BaseTest):
|
||||||
|
"""TestWorkflowService."""
|
||||||
|
|
||||||
|
def test_run_at_delay_is_0_for_regular_start_events(self, now_in_utc: datetime) -> None:
|
||||||
|
workflow = workflow_from_fragment(
|
||||||
|
"""
|
||||||
|
<bpmn:process id="no_tasks" name="No Tasks" isExecutable="true">
|
||||||
|
<bpmn:startEvent id="StartEvent_1">
|
||||||
|
<bpmn:outgoing>Flow_184umot</bpmn:outgoing>
|
||||||
|
</bpmn:startEvent>
|
||||||
|
<bpmn:endEvent id="Event_0qq9il3">
|
||||||
|
<bpmn:incoming>Flow_184umot</bpmn:incoming>
|
||||||
|
</bpmn:endEvent>
|
||||||
|
<bpmn:sequenceFlow id="Flow_184umot" sourceRef="StartEvent_1" targetRef="Event_0qq9il3" />
|
||||||
|
</bpmn:process>
|
||||||
|
""",
|
||||||
|
"no_tasks",
|
||||||
|
)
|
||||||
|
delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, now_in_utc)
|
||||||
|
assert delay == 0
|
||||||
|
|
||||||
|
def test_run_at_delay_is_30_for_30_second_duration_start_timer_event(self, now_in_utc: datetime) -> None:
|
||||||
|
workflow = workflow_from_fragment(
|
||||||
|
"""
|
||||||
|
<bpmn:process id="Process_aldvgey" isExecutable="true">
|
||||||
|
<bpmn:startEvent id="StartEvent_1">
|
||||||
|
<bpmn:outgoing>Flow_1x1o335</bpmn:outgoing>
|
||||||
|
<bpmn:timerEventDefinition id="TimerEventDefinition_1vi6a54">
|
||||||
|
<bpmn:timeDuration xsi:type="bpmn:tFormalExpression">"PT30S"</bpmn:timeDuration>
|
||||||
|
</bpmn:timerEventDefinition>
|
||||||
|
</bpmn:startEvent>
|
||||||
|
<bpmn:sequenceFlow id="Flow_1x1o335" sourceRef="StartEvent_1" targetRef="Event_0upbokh" />
|
||||||
|
<bpmn:endEvent id="Event_0upbokh">
|
||||||
|
<bpmn:incoming>Flow_1x1o335</bpmn:incoming>
|
||||||
|
</bpmn:endEvent>
|
||||||
|
</bpmn:process>
|
||||||
|
""",
|
||||||
|
"Process_aldvgey",
|
||||||
|
)
|
||||||
|
delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, now_in_utc)
|
||||||
|
assert delay == 30
|
||||||
|
|
||||||
|
def test_run_at_delay_is_300_if_5_mins_before_date_start_timer_event(
|
||||||
|
self, example_start_datetime_in_utc_str: str, example_start_datetime_minus_5_mins_in_utc: datetime
|
||||||
|
) -> None:
|
||||||
|
workflow = workflow_from_fragment(
|
||||||
|
f"""
|
||||||
|
<bpmn:process id="Process_aldvgey" isExecutable="true">
|
||||||
|
<bpmn:startEvent id="StartEvent_1">
|
||||||
|
<bpmn:outgoing>Flow_1x1o335</bpmn:outgoing>
|
||||||
|
<bpmn:timerEventDefinition id="TimerEventDefinition_1vi6a54">
|
||||||
|
<bpmn:timeDate xsi:type="bpmn:tFormalExpression">"{example_start_datetime_in_utc_str}"</bpmn:timeDate>
|
||||||
|
</bpmn:timerEventDefinition>
|
||||||
|
</bpmn:startEvent>
|
||||||
|
<bpmn:sequenceFlow id="Flow_1x1o335" sourceRef="StartEvent_1" targetRef="Event_0upbokh" />
|
||||||
|
<bpmn:endEvent id="Event_0upbokh">
|
||||||
|
<bpmn:incoming>Flow_1x1o335</bpmn:incoming>
|
||||||
|
</bpmn:endEvent>
|
||||||
|
</bpmn:process>
|
||||||
|
""",
|
||||||
|
"Process_aldvgey",
|
||||||
|
)
|
||||||
|
delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, example_start_datetime_minus_5_mins_in_utc)
|
||||||
|
assert delay == 300
|
Loading…
Reference in New Issue