current state, db.session.begin did not actually help w/ burnettk

This commit is contained in:
jasquat 2024-11-27 14:40:25 -05:00
parent c9d7686e06
commit 480592b308
No known key found for this signature in database
3 changed files with 47 additions and 35 deletions

View File

@ -111,4 +111,3 @@ def main():
if __name__ == "__main__":
main()

View File

@ -60,6 +60,7 @@ class MessageService:
message_type=MessageTypes.receive.value,
).all()
message_instance_receive: MessageInstanceModel | None = None
processor = None
try:
for message_instance in available_receive_messages:
if message_instance.correlates(message_instance_send, CustomBpmnScriptEngine()):
@ -75,7 +76,8 @@ class MessageService:
user: UserModel | None = message_instance_send.user
if user is None:
user = UserService.find_or_create_system_user()
receiving_process_instance = MessageService.start_process_with_message(
# NEW INSTANCE
receiving_process_instance, processor = MessageService.start_process_with_message(
message_triggerable_process_model,
user,
message_instance_send=message_instance_send,
@ -89,7 +91,7 @@ class MessageService:
else:
receiving_process_instance = MessageService.get_process_instance_for_message_instance(message_instance_receive)
if message_instance_receive is not None:
if processor is None and message_instance_receive is not None:
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"
db.session.add(message_instance_receive)
@ -102,25 +104,22 @@ class MessageService:
if message_instance_receive is not None:
message_instance_receive.status = "ready"
db.session.add(message_instance_receive)
db.session.commit()
if processor is not None:
processor.save()
else:
db.session.commit()
# return None
exception_message = f"Bad Message Instance: Receive {message_instance_receive}."
if receiving_process_instance:
exception_message += f" PI: {receiving_process_instance.can_receive_message()}"
if message_triggerable_process_model:
exception_message += f" TRIGGER: {message_triggerable_process_model}"
else:
exception_message += " PI: None"
exception_message += f" TRIGGER: {message_triggerable_process_model}"
exception_message += f" SEND: {message_instance_send}"
raise Exception(exception_message)
try:
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
# # Set the receiving message to running, so it is not altered elswhere ...
# current_app.logger.info(f"HEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEY")
# message_instance_receive.status = "running"
# db.session.add(message_instance_receive)
# db.session.commit()
# # time.sleep(0.5)
cls.process_message_receive(
receiving_process_instance, message_instance_receive, message_instance_send, execution_mode=execution_mode
)
@ -130,7 +129,11 @@ class MessageService:
message_instance_send.status = "completed"
message_instance_send.counterpart_id = message_instance_receive.id
db.session.add(message_instance_send)
db.session.commit()
if processor is not None:
processor.save()
else:
db.session.commit()
# ALL message instances are processed and will not be picked up elsewhere
if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode):
queue_process_instance_if_appropriate(receiving_process_instance, execution_mode=execution_mode)
return message_instance_receive
@ -141,7 +144,10 @@ class MessageService:
if message_instance_receive is not None:
message_instance_receive.status = "ready"
db.session.add(message_instance_receive)
db.session.commit()
if processor is not None:
processor.save()
else:
db.session.commit()
# return None
raise
@ -154,7 +160,10 @@ class MessageService:
message_instance_receive.status = "failed"
message_instance_receive.failure_cause = str(exception)
db.session.add(message_instance_receive)
db.session.commit()
if processor is not None:
processor.save()
else:
db.session.commit()
raise exception
@classmethod
@ -179,7 +188,7 @@ class MessageService:
user: UserModel,
message_instance_send: MessageInstanceModel | None = None,
execution_mode: str | None = None,
) -> ProcessInstanceModel:
) -> tuple[ProcessInstanceModel, ProcessInstanceProcessor]:
"""Start up a process instance, so it is ready to catch the event."""
receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier,
@ -203,9 +212,9 @@ class MessageService:
):
processor_receive.bpmn_process_instance.correlations = message_instance_send.correlation_keys
processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
processor_receive.do_engine_steps(save=False, execution_strategy_name=execution_strategy_name)
return receiving_process_instance
return (receiving_process_instance, processor_receive)
@staticmethod
def process_message_receive(
@ -239,15 +248,15 @@ class MessageService:
if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode):
# even if we are queueing, we ran a "send_event" call up above, and it updated some tasks.
# we need to serialize these task updates to the db. do_engine_steps with save does that.
processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks")
processor_receive.do_engine_steps(save=False, execution_strategy_name="run_current_ready_tasks")
elif not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(receiving_process_instance):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
processor_receive.do_engine_steps(save=False, execution_strategy_name=execution_strategy_name)
message_instance_receive.status = MessageStatuses.completed.value
db.session.add(message_instance_receive)
db.session.commit()
# db.session.commit()
@classmethod
def find_message_triggerable_process_model(cls, modified_message_name: str) -> MessageTriggerableProcessModel:

View File

@ -32,8 +32,9 @@ class ProcessInstanceQueueService:
queue_entry.locked_by = None
queue_entry.locked_at_in_seconds = None
db.session.add(queue_entry)
db.session.commit()
with db.session.session_factory() as new_session:
new_session.add(queue_entry)
new_session.commit()
@classmethod
def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel, run_at_in_seconds: int) -> None:
@ -56,16 +57,17 @@ class ProcessInstanceQueueService:
locked_by = ProcessInstanceLockService.locked_by()
current_time = round(time.time())
db.session.query(ProcessInstanceQueueModel).filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by.is_(None), # type: ignore
).update(
{
"locked_by": locked_by,
"locked_at_in_seconds": current_time,
}
)
db.session.commit()
with db.session.session_factory() as new_session:
new_session.query(ProcessInstanceQueueModel).filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by.is_(None), # type: ignore
).update(
{
"locked_by": locked_by,
"locked_at_in_seconds": current_time,
}
)
new_session.commit()
queue_entry = (
db.session.query(ProcessInstanceQueueModel)
@ -74,6 +76,7 @@ class ProcessInstanceQueueService:
)
.first()
)
db.session.refresh(queue_entry)
if queue_entry is None:
raise ProcessInstanceIsNotEnqueuedError(
@ -85,7 +88,7 @@ class ProcessInstanceQueueService:
if queue_entry.locked_by is None:
message = "It was locked by something else when we tried to lock it in the db, but it has since been unlocked."
raise ProcessInstanceIsAlreadyLockedError(
f"{locked_by} cannot lock process instance {process_instance.id}. {message}"
f"{locked_by} cannot lock process instance {process_instance.id}. {queue_entry.locked_by}. {message}"
)
ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@ -115,6 +118,7 @@ class ProcessInstanceQueueService:
max_attempts: int = 1,
ignore_cannot_be_run_error: bool = False,
) -> Generator[None, None, None]:
# yield
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
if not reentering_lock: