do not save items on message start event w/ burnettk

This commit is contained in:
jasquat 2024-11-27 15:57:25 -05:00
parent 480592b308
commit b9ee34c72d
No known key found for this signature in database
6 changed files with 100 additions and 74 deletions

View File

@ -1,6 +1,7 @@
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db
@ -18,6 +19,8 @@ class ProcessInstanceQueueModel(SpiffworkflowBaseDBModel):
locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True)
status: str = db.Column(db.String(50), index=True)
process_instance = relationship(ProcessInstanceModel)
# for timers. right now the apscheduler jobs without celery check for waiting process instances.
# if the instance's run_at_in_seconds is now or earlier, the instance will run.
# so we can save some effort if we detect that it is scheduled to run later.

View File

@ -60,7 +60,7 @@ class MessageService:
message_type=MessageTypes.receive.value,
).all()
message_instance_receive: MessageInstanceModel | None = None
processor = None
processor_receive = None
try:
for message_instance in available_receive_messages:
if message_instance.correlates(message_instance_send, CustomBpmnScriptEngine()):
@ -76,8 +76,7 @@ class MessageService:
user: UserModel | None = message_instance_send.user
if user is None:
user = UserService.find_or_create_system_user()
# NEW INSTANCE
receiving_process_instance, processor = MessageService.start_process_with_message(
receiving_process_instance, processor_receive = MessageService.start_process_with_message(
message_triggerable_process_model,
user,
message_instance_send=message_instance_send,
@ -91,7 +90,7 @@ class MessageService:
else:
receiving_process_instance = MessageService.get_process_instance_for_message_instance(message_instance_receive)
if processor is None and message_instance_receive is not None:
if processor_receive is None and message_instance_receive is not None:
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"
db.session.add(message_instance_receive)
@ -104,8 +103,8 @@ class MessageService:
if message_instance_receive is not None:
message_instance_receive.status = "ready"
db.session.add(message_instance_receive)
if processor is not None:
processor.save()
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
# return None
@ -119,9 +118,13 @@ class MessageService:
raise Exception(exception_message)
try:
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
with ProcessInstanceQueueService.dequeued(receiving_process_instance, needs_dequeue=False):
cls.process_message_receive(
receiving_process_instance, message_instance_receive, message_instance_send, execution_mode=execution_mode
receiving_process_instance,
message_instance_receive,
message_instance_send,
execution_mode=execution_mode,
processor_receive=processor_receive,
)
message_instance_receive.status = "completed"
message_instance_receive.counterpart_id = message_instance_send.id
@ -129,8 +132,8 @@ class MessageService:
message_instance_send.status = "completed"
message_instance_send.counterpart_id = message_instance_receive.id
db.session.add(message_instance_send)
if processor is not None:
processor.save()
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
# ALL message instances are processed and will not be picked up elsewhere
@ -144,8 +147,8 @@ class MessageService:
if message_instance_receive is not None:
message_instance_receive.status = "ready"
db.session.add(message_instance_receive)
if processor is not None:
processor.save()
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
# return None
@ -160,8 +163,8 @@ class MessageService:
message_instance_receive.status = "failed"
message_instance_receive.failure_cause = str(exception)
db.session.add(message_instance_receive)
if processor is not None:
processor.save()
if processor_receive is not None:
processor_receive.save()
else:
db.session.commit()
raise exception
@ -191,13 +194,12 @@ class MessageService:
) -> tuple[ProcessInstanceModel, ProcessInstanceProcessor]:
"""Start up a process instance, so it is ready to catch the event."""
receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier,
user,
message_triggerable_process_model.process_model_identifier, user, commit_db=False
)
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
with ProcessInstanceQueueService.dequeued(receiving_process_instance, needs_dequeue=False):
processor_receive = ProcessInstanceProcessor(receiving_process_instance)
cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model)
processor_receive.save()
# processor_receive.save()
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
@ -212,7 +214,7 @@ class MessageService:
):
processor_receive.bpmn_process_instance.correlations = message_instance_send.correlation_keys
processor_receive.do_engine_steps(save=False, execution_strategy_name=execution_strategy_name)
processor_receive.do_engine_steps(save=False, execution_strategy_name=execution_strategy_name, needs_dequeue=False)
return (receiving_process_instance, processor_receive)
@ -222,6 +224,7 @@ class MessageService:
message_instance_receive: MessageInstanceModel,
message_instance_send: MessageInstanceModel,
execution_mode: str | None = None,
processor_receive: ProcessInstanceProcessor | None = None,
) -> None:
correlation_properties = []
for cr in message_instance_receive.correlation_rules:
@ -241,22 +244,31 @@ class MessageService:
payload=message_instance_send.payload,
correlations=message_instance_send.correlation_keys,
)
processor_receive = ProcessInstanceProcessor(receiving_process_instance)
processor_receive.bpmn_process_instance.send_event(bpmn_event)
processor_receive_to_use = processor_receive
save_engine_steps = False
if processor_receive_to_use is None:
processor_receive_to_use = ProcessInstanceProcessor(receiving_process_instance)
save_engine_steps = True
processor_receive_to_use.bpmn_process_instance.send_event(bpmn_event)
execution_strategy_name = None
if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode):
# even if we are queueing, we ran a "send_event" call up above, and it updated some tasks.
# we need to serialize these task updates to the db. do_engine_steps with save does that.
processor_receive.do_engine_steps(save=False, execution_strategy_name="run_current_ready_tasks")
processor_receive_to_use.do_engine_steps(
save=save_engine_steps, execution_strategy_name="run_current_ready_tasks", needs_dequeue=save_engine_steps
)
elif not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(receiving_process_instance):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
processor_receive.do_engine_steps(save=False, execution_strategy_name=execution_strategy_name)
processor_receive_to_use.do_engine_steps(
save=save_engine_steps, execution_strategy_name=execution_strategy_name, needs_dequeue=save_engine_steps
)
message_instance_receive.status = MessageStatuses.completed.value
db.session.add(message_instance_receive)
# db.session.commit()
if save_engine_steps:
db.session.commit()
@classmethod
def find_message_triggerable_process_model(cls, modified_message_name: str) -> MessageTriggerableProcessModel:

View File

@ -1575,6 +1575,7 @@ class ProcessInstanceProcessor:
execution_strategy: ExecutionStrategy | None = None,
should_schedule_waiting_timer_events: bool = True,
ignore_cannot_be_run_error: bool = False,
needs_dequeue: bool = True,
) -> TaskRunnability:
if not ignore_cannot_be_run_error and not self.process_instance_model.allowed_to_run():
raise ProcessInstanceCannotBeRunError(
@ -1582,7 +1583,7 @@ class ProcessInstanceProcessor:
f"'{self.process_instance_model.status}' and therefore cannot run."
)
if self.process_instance_model.persistence_level != "none":
with ProcessInstanceQueueService.dequeued(self.process_instance_model):
with ProcessInstanceQueueService.dequeued(self.process_instance_model, needs_dequeue=needs_dequeue):
# TODO: ideally we just lock in the execution service, but not sure
# about _add_bpmn_process_definitions and if that needs to happen in
# the same lock like it does on main
@ -1592,6 +1593,7 @@ class ProcessInstanceProcessor:
execution_strategy_name,
execution_strategy,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
needs_dequeue=needs_dequeue,
)
else:
return self._do_engine_steps(
@ -1609,6 +1611,7 @@ class ProcessInstanceProcessor:
execution_strategy_name: str | None = None,
execution_strategy: ExecutionStrategy | None = None,
should_schedule_waiting_timer_events: bool = True,
needs_dequeue: bool = True,
) -> TaskRunnability:
self._add_bpmn_process_definitions(
self.serialize(),
@ -1645,6 +1648,7 @@ class ProcessInstanceProcessor:
save,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
# profile=True,
needs_dequeue=needs_dequeue,
)
self.task_model_mapping, self.bpmn_subprocess_mapping = task_model_delegate.get_guid_to_db_object_mappings()
self.check_all_tasks()

View File

@ -32,13 +32,11 @@ class ProcessInstanceQueueService:
queue_entry.locked_by = None
queue_entry.locked_at_in_seconds = None
with db.session.session_factory() as new_session:
new_session.add(queue_entry)
new_session.commit()
db.session.add(queue_entry)
@classmethod
def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel, run_at_in_seconds: int) -> None:
queue_entry = ProcessInstanceQueueModel(process_instance_id=process_instance.id, run_at_in_seconds=run_at_in_seconds)
queue_entry = ProcessInstanceQueueModel(process_instance=process_instance, run_at_in_seconds=run_at_in_seconds)
cls._configure_and_save_queue_entry(process_instance, queue_entry)
@classmethod
@ -51,23 +49,23 @@ class ProcessInstanceQueueService:
if current_time > queue_entry.run_at_in_seconds:
queue_entry.run_at_in_seconds = current_time
cls._configure_and_save_queue_entry(process_instance, queue_entry)
db.session.commit()
@classmethod
def _dequeue(cls, process_instance: ProcessInstanceModel) -> None:
locked_by = ProcessInstanceLockService.locked_by()
current_time = round(time.time())
with db.session.session_factory() as new_session:
new_session.query(ProcessInstanceQueueModel).filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by.is_(None), # type: ignore
).update(
{
"locked_by": locked_by,
"locked_at_in_seconds": current_time,
}
)
new_session.commit()
db.session.query(ProcessInstanceQueueModel).filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by.is_(None), # type: ignore
).update(
{
"locked_by": locked_by,
"locked_at_in_seconds": current_time,
}
)
db.session.commit()
queue_entry = (
db.session.query(ProcessInstanceQueueModel)
@ -117,35 +115,41 @@ class ProcessInstanceQueueService:
process_instance: ProcessInstanceModel,
max_attempts: int = 1,
ignore_cannot_be_run_error: bool = False,
needs_dequeue: bool = True,
) -> Generator[None, None, None]:
# yield
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
# needs_dequeue is more of a hack so we can avoid db commits in special code paths.
# ideally all commits would happen at the top level such as in controllers or in background
# service entry paths and then we can lock from those same or closer locations when necessary.
if needs_dequeue:
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
if not reentering_lock:
# this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError
# that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock
cls._dequeue_with_retries(process_instance, max_attempts=max_attempts)
try:
yield
except ProcessInstanceCannotBeRunError as ex:
if not ignore_cannot_be_run_error:
if not reentering_lock:
# this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError
# that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock
cls._dequeue_with_retries(process_instance, max_attempts=max_attempts)
try:
yield
except ProcessInstanceCannotBeRunError as ex:
if not ignore_cannot_be_run_error:
raise ex
except Exception as ex:
# these events are handled in the WorkflowExecutionService.
# that is, we don't need to add error_detail records here, etc.
if not isinstance(ex, WorkflowExecutionServiceError):
ProcessInstanceTmpService.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex
)
# we call dequeued multiple times but we want this code to only happen once.
# assume that if we are not reentering_lock then this is the top level call and should be the one to handle the error.
if not reentering_lock:
ErrorHandlingService.handle_error(process_instance, ex)
raise ex
except Exception as ex:
# these events are handled in the WorkflowExecutionService.
# that is, we don't need to add error_detail records here, etc.
if not isinstance(ex, WorkflowExecutionServiceError):
ProcessInstanceTmpService.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex
)
# we call dequeued multiple times but we want this code to only happen once.
# assume that if we are not reentering_lock then this is the top level call and should be the one to handle the error.
if not reentering_lock:
ErrorHandlingService.handle_error(process_instance, ex)
raise ex
finally:
if not reentering_lock:
cls._enqueue(process_instance)
finally:
if not reentering_lock:
cls._enqueue(process_instance)
else:
yield
@classmethod
def entries_with_status(

View File

@ -130,7 +130,7 @@ class ProcessInstanceService:
start_configuration: StartConfiguration | None = None,
) -> tuple[ProcessInstanceModel, StartConfiguration]:
# FIXME: this should not be necessary. fix the code paths that make this necessary
db.session.commit()
# db.session.commit()
git_revision_error = None
try:
current_git_revision = GitService.get_current_revision()
@ -147,7 +147,6 @@ class ProcessInstanceService:
bpmn_version_control_identifier=current_git_revision,
)
db.session.add(process_instance_model)
db.session.commit()
if git_revision_error is not None:
message = (
@ -381,10 +380,13 @@ class ProcessInstanceService:
cls,
process_model_identifier: str,
user: UserModel,
commit_db: bool = True,
) -> ProcessInstanceModel:
process_model = ProcessModelService.get_process_model(process_model_identifier)
process_instance_model, (cycle_count, _, duration_in_seconds) = cls.create_process_instance(process_model, user)
cls.register_process_model_cycles(process_model_identifier, cycle_count, duration_in_seconds)
if commit_db:
db.session.commit()
return process_instance_model
@classmethod
@ -398,8 +400,6 @@ class ProcessInstanceService:
for cycle in cycles:
db.session.delete(cycle)
db.session.commit()
if cycle_count != 0:
if duration_in_seconds == 0:
raise ApiError(
@ -414,7 +414,6 @@ class ProcessInstanceService:
current_cycle=0,
)
db.session.add(cycle)
db.session.commit()
@classmethod
def schedule_next_process_model_cycle(cls, process_instance_model: ProcessInstanceModel) -> None:

View File

@ -509,6 +509,7 @@ class WorkflowExecutionService:
save: bool = False,
should_schedule_waiting_timer_events: bool = True,
profile: bool = False,
needs_dequeue: bool = True,
) -> TaskRunnability:
if profile:
import cProfile
@ -516,19 +517,22 @@ class WorkflowExecutionService:
task_runnability = TaskRunnability.unknown_if_ready_tasks
with cProfile.Profile() as pr:
task_runnability = self._run_and_save(exit_at, save, should_schedule_waiting_timer_events)
task_runnability = self._run_and_save(
exit_at, save, should_schedule_waiting_timer_events, needs_dequeue=needs_dequeue
)
pr.print_stats(sort=SortKey.CUMULATIVE)
return task_runnability
return self._run_and_save(exit_at, save, should_schedule_waiting_timer_events)
return self._run_and_save(exit_at, save, should_schedule_waiting_timer_events, needs_dequeue=needs_dequeue)
def _run_and_save(
self,
exit_at: None = None,
save: bool = False,
should_schedule_waiting_timer_events: bool = True,
needs_dequeue: bool = True,
) -> TaskRunnability:
if self.process_instance_model.persistence_level != "none":
if needs_dequeue and self.process_instance_model.persistence_level != "none":
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
if tripped:
raise AssertionError(