From 1486edbf9719a2f220833542f5fdc9b062993d49 Mon Sep 17 00:00:00 2001 From: jbirddog <100367399+jbirddog@users.noreply.github.com> Date: Tue, 30 May 2023 13:51:37 -0400 Subject: [PATCH] Cycle Timer Start Event Support (#285) --- .../migrations/versions/e4b6bbf83a3e_.py | 42 +++++++++++ .../load_database_models.py | 3 + .../models/process_model_cycle.py | 19 +++++ .../services/email_service.py | 1 - .../services/message_service.py | 1 - .../services/process_instance_service.py | 73 +++++++++++++++++-- .../services/secret_service.py | 1 - .../services/workflow_service.py | 30 +++----- .../specs/start_event.py | 34 ++++++--- .../unit/test_workflow_service.py | 8 +- 10 files changed, 167 insertions(+), 45 deletions(-) create mode 100644 spiffworkflow-backend/migrations/versions/e4b6bbf83a3e_.py create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/models/process_model_cycle.py diff --git a/spiffworkflow-backend/migrations/versions/e4b6bbf83a3e_.py b/spiffworkflow-backend/migrations/versions/e4b6bbf83a3e_.py new file mode 100644 index 00000000..82cb312d --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/e4b6bbf83a3e_.py @@ -0,0 +1,42 @@ +"""empty message + +Revision ID: e4b6bbf83a3e +Revises: 6aa02463da9c +Create Date: 2023-05-30 10:17:10.595965 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'e4b6bbf83a3e' +down_revision = '6aa02463da9c' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('process_model_cycle', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_model_identifier', sa.String(length=255), nullable=False), + sa.Column('cycle_count', sa.Integer(), nullable=True), + sa.Column('duration_in_seconds', sa.Integer(), nullable=True), + sa.Column('current_cycle', sa.Integer(), nullable=True), + sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('process_model_cycle', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_process_model_cycle_process_model_identifier'), ['process_model_identifier'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('process_model_cycle', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_process_model_cycle_process_model_identifier')) + + op.drop_table('process_model_cycle') + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/load_database_models.py b/spiffworkflow-backend/src/spiffworkflow_backend/load_database_models.py index ed542761..22e570e1 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/load_database_models.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/load_database_models.py @@ -76,5 +76,8 @@ from spiffworkflow_backend.models.process_instance_queue import ( from spiffworkflow_backend.models.active_user import ( ActiveUserModel, ) # noqa: F401 +from spiffworkflow_backend.models.process_model_cycle import ( + ProcessModelCycleModel, +) # noqa: F401 add_listeners() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_model_cycle.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_model_cycle.py new file mode 100644 index 00000000..bc745590 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_model_cycle.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass + +from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel +from spiffworkflow_backend.models.db import db + + +@dataclass +class ProcessModelCycleModel(SpiffworkflowBaseDBModel): + """ProcessInstanceQueueModel.""" + + __tablename__ = "process_model_cycle" + + id: int = db.Column(db.Integer, primary_key=True) + process_model_identifier: str = db.Column(db.String(255), nullable=False, index=True) + cycle_count: int = db.Column(db.Integer) + duration_in_seconds: int = db.Column(db.Integer) + current_cycle: int = db.Column(db.Integer) + updated_at_in_seconds: int = db.Column(db.Integer) + created_at_in_seconds: int = db.Column(db.Integer) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/email_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/email_service.py index 7108dc58..0d6da09e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/email_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/email_service.py @@ -1,4 +1,3 @@ - from flask import current_app from flask_mail import Message # type: ignore diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index ce7240f5..d26aa8b0 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -1,4 +1,3 @@ - from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageStatuses diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 4a8606ca..f342f5f0 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -21,6 +21,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel from spiffworkflow_backend.models.process_model import ProcessModelInfo +from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel from spiffworkflow_backend.models.task import Task from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService @@ -33,6 +34,7 @@ from spiffworkflow_backend.services.process_instance_queue_service import Proces from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.workflow_service import WorkflowService +from spiffworkflow_backend.specs.start_event import StartConfiguration class ProcessInstanceService: @@ -42,22 +44,27 @@ class ProcessInstanceService: TASK_STATE_LOCKED = "locked" @staticmethod - def calculate_start_delay_in_seconds(process_instance_model: ProcessInstanceModel) -> int: + def next_start_event_configuration(process_instance_model: ProcessInstanceModel) -> StartConfiguration: try: processor = ProcessInstanceProcessor(process_instance_model) - delay_in_seconds = WorkflowService.calculate_run_at_delay_in_seconds( + start_configuration = WorkflowService.next_start_event_configuration( processor.bpmn_process_instance, datetime.now(timezone.utc) ) except Exception: - delay_in_seconds = 0 - return delay_in_seconds + start_configuration = None + + if start_configuration is None: + start_configuration = (0, 0, 0) + + return start_configuration @classmethod def create_process_instance( cls, process_model: ProcessModelInfo, user: UserModel, - ) -> ProcessInstanceModel: + start_configuration: StartConfiguration | None = None, + ) -> tuple[ProcessInstanceModel, StartConfiguration]: """Get_process_instance_from_spec.""" db.session.commit() try: @@ -75,10 +82,13 @@ class ProcessInstanceService: ) db.session.add(process_instance_model) db.session.commit() - delay_in_seconds = cls.calculate_start_delay_in_seconds(process_instance_model) + + if start_configuration is None: + start_configuration = cls.next_start_event_configuration(process_instance_model) + _, delay_in_seconds, _ = start_configuration run_at_in_seconds = round(time.time()) + delay_in_seconds ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model, run_at_in_seconds) - return process_instance_model + return (process_instance_model, start_configuration) @classmethod def create_process_instance_from_process_model_identifier( @@ -88,7 +98,52 @@ class ProcessInstanceService: ) -> ProcessInstanceModel: """Create_process_instance_from_process_model_identifier.""" process_model = ProcessModelService.get_process_model(process_model_identifier) - return cls.create_process_instance(process_model, user) + process_instance_model, (cycle_count, _, duration_in_seconds) = cls.create_process_instance( + process_model, user + ) + cls.register_process_model_cycles(process_model_identifier, cycle_count, duration_in_seconds) + return process_instance_model + + @classmethod + def register_process_model_cycles( + cls, process_model_identifier: str, cycle_count: int, duration_in_seconds: int + ) -> None: + # clean up old cycle record if it exists. event if the given cycle_count is 0 the previous version + # of the model could have included a cycle timer start event + cycles = ProcessModelCycleModel.query.filter( + ProcessModelCycleModel.process_model_identifier == process_model_identifier, + ).all() + + for cycle in cycles: + db.session.delete(cycle) + + if cycle_count != 0: + cycle = ProcessModelCycleModel( + process_model_identifier=process_model_identifier, + cycle_count=cycle_count, + duration_in_seconds=duration_in_seconds, + current_cycle=0, + ) + db.session.add(cycle) + + db.session.commit() + + @classmethod + def schedule_next_process_model_cycle(cls, process_instance_model: ProcessInstanceModel) -> None: + cycle = ProcessModelCycleModel.query.filter( + ProcessModelCycleModel.process_model_identifier == process_instance_model.process_model_identifier + ).first() + + if cycle is None or cycle.cycle_count == 0: + return + + if cycle.cycle_count == -1 or cycle.current_cycle < cycle.cycle_count: + process_model = ProcessModelService.get_process_model(process_instance_model.process_model_identifier) + start_configuration = (cycle.cycle_count, cycle.duration_in_seconds, cycle.duration_in_seconds) + cls.create_process_instance(process_model, process_instance_model.process_initiator, start_configuration) + cycle.current_cycle += 1 + db.session.add(cycle) + db.session.commit() @classmethod def waiting_event_can_be_skipped(cls, waiting_event: dict[str, Any], now_in_utc: datetime) -> bool: @@ -155,6 +210,8 @@ class ProcessInstanceService: cls.run_process_instance_with_processor( process_instance, status_value=status_value, execution_strategy_name=execution_strategy_name ) + if process_instance.status == "complete": + cls.schedule_next_process_model_cycle(process_instance) except ProcessInstanceIsAlreadyLockedError: continue except Exception as e: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/secret_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/secret_service.py index da0f5c36..6edeb364 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/secret_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/secret_service.py @@ -1,4 +1,3 @@ - from flask import current_app from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.db import db diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py index 5668226d..83acc4e2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_service.py @@ -3,6 +3,7 @@ from datetime import datetime from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState +from spiffworkflow_backend.specs.start_event import StartConfiguration from spiffworkflow_backend.specs.start_event import StartEvent @@ -14,24 +15,13 @@ class WorkflowService: return [t for t in workflow.get_tasks(TaskState.FUTURE) if isinstance(t.task_spec, StartEvent)] @classmethod - def next_start_event_delay_in_seconds(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> int: + def next_start_event_configuration(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> StartConfiguration | None: start_events = cls.future_start_events(workflow) - start_delays: list[int] = [] - for start_event in start_events: - start_delay = start_event.task_spec.start_delay_in_seconds(start_event, now_in_utc) - start_delays.append(start_delay) - start_delays.sort() - return start_delays[0] if len(start_delays) > 0 else 0 - - @classmethod - def calculate_run_at_delay_in_seconds(cls, workflow: BpmnWorkflow, now_in_utc: datetime) -> int: - # TODO: for now we are using the first start time because I am not sure how multiple - # start events should work. I think the right answer is to take the earliest start - # time and have later start events stay FUTURE/WAITING?, then we need to be able - # to respect the other start events when enqueue'ing. - # - # TODO: this method should also expand to include other FUTURE/WAITING timers when - # enqueue'ing so that we don't have to check timers every 10 or whatever seconds - # right now we assume that this is being called to create a process - - return cls.next_start_event_delay_in_seconds(workflow, now_in_utc) + configurations = list( + map( + lambda start_event: start_event.task_spec.configuration(start_event, now_in_utc), # type: ignore + start_events, + ) + ) + configurations.sort(key=lambda configuration: configuration[1]) # type: ignore + return configurations[0] if len(configurations) > 0 else None diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py b/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py index 56e238f3..5cb929e4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/specs/start_event.py @@ -13,8 +13,11 @@ from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition from SpiffWorkflow.spiff.parser.event_parsers import SpiffStartEventParser # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore +StartConfiguration = tuple[int, int, int] # TODO: cylce timers and repeat counts? + + class StartEvent(DefaultStartEvent): # type: ignore def __init__(self, wf_spec, bpmn_id, event_definition, **kwargs): # type: ignore if isinstance(event_definition, TimerEventDefinition): @@ -33,27 +36,36 @@ class StartEvent(DefaultStartEvent): # type: ignore def register_parser_class(parser_config: dict[str, Any]) -> None: parser_config[full_tag("startEvent")] = (SpiffStartEventParser, StartEvent) - def start_delay_in_seconds(self, my_task: SpiffTask, now_in_utc: datetime) -> int: - script_engine = my_task.workflow.script_engine - evaluated_expression = None - parsed_duration = None - - if isinstance(self.timer_definition, TimerEventDefinition) and script_engine is not None: - evaluated_expression = script_engine.evaluate(my_task, self.timer_definition.expression) + def configuration(self, my_task: SpiffTask, now_in_utc: datetime) -> StartConfiguration: + evaluated_expression = self.evaluated_timer_expression(my_task) + cycles = 0 + start_delay_in_seconds = 0 + duration = 0 if evaluated_expression is not None: if isinstance(self.timer_definition, TimeDateEventDefinition): parsed_duration = TimerEventDefinition.parse_time_or_duration(evaluated_expression) time_delta = parsed_duration - now_in_utc - return time_delta.seconds # type: ignore + start_delay_in_seconds = time_delta.seconds elif isinstance(self.timer_definition, DurationTimerEventDefinition): parsed_duration = TimerEventDefinition.parse_iso_duration(evaluated_expression) time_delta = TimerEventDefinition.get_timedelta_from_start(parsed_duration, now_in_utc) - return time_delta.seconds # type: ignore + start_delay_in_seconds = time_delta.seconds elif isinstance(self.timer_definition, CycleTimerEventDefinition): - return 0 + cycles, start, cycle_duration = TimerEventDefinition.parse_iso_recurring_interval(evaluated_expression) + time_delta = start - now_in_utc + cycle_duration + start_delay_in_seconds = time_delta.seconds + duration = cycle_duration.seconds - return 0 + return (cycles, start_delay_in_seconds, duration) + + def evaluated_timer_expression(self, my_task: SpiffTask) -> Any: + script_engine = my_task.workflow.script_engine + evaluated_expression = None + + if isinstance(self.timer_definition, TimerEventDefinition) and script_engine is not None: + evaluated_expression = script_engine.evaluate(my_task, self.timer_definition.expression) + return evaluated_expression class StartEventConverter(EventConverter): # type: ignore diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py index 3428c60c..26537d92 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_workflow_service.py @@ -84,7 +84,7 @@ class TestWorkflowService(BaseTest): """, "no_tasks", ) - delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, now_in_utc) + _, delay, _ = WorkflowService.next_start_event_configuration(workflow, now_in_utc) # type: ignore assert delay == 0 def test_run_at_delay_is_30_for_30_second_duration_start_timer_event(self, now_in_utc: datetime) -> None: @@ -105,7 +105,7 @@ class TestWorkflowService(BaseTest): """, "Process_aldvgey", ) - delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, now_in_utc) + _, delay, _ = WorkflowService.next_start_event_configuration(workflow, now_in_utc) # type: ignore assert delay == 30 def test_run_at_delay_is_300_if_5_mins_before_date_start_timer_event( @@ -128,5 +128,7 @@ class TestWorkflowService(BaseTest): """, "Process_aldvgey", ) - delay = WorkflowService.calculate_run_at_delay_in_seconds(workflow, example_start_datetime_minus_5_mins_in_utc) + _, delay, _ = WorkflowService.next_start_event_configuration( + workflow, example_start_datetime_minus_5_mins_in_utc + ) # type: ignore assert delay == 300