Trip safe asserts in tests, various process instance queue improvements (#199)
This commit is contained in:
parent
457487ff63
commit
a1a54c54bb
|
@ -2,3 +2,4 @@ pyrightconfig.json
|
||||||
.idea/
|
.idea/
|
||||||
t
|
t
|
||||||
.dccache
|
.dccache
|
||||||
|
*~
|
|
@ -139,13 +139,5 @@ SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB = environ.get(
|
||||||
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB", default="greedy"
|
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB", default="greedy"
|
||||||
)
|
)
|
||||||
|
|
||||||
SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES = int(
|
|
||||||
environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES", default="3")
|
|
||||||
)
|
|
||||||
|
|
||||||
SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS = int(
|
|
||||||
environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS", default="1")
|
|
||||||
)
|
|
||||||
|
|
||||||
# this is only used in CI. use SPIFFWORKFLOW_BACKEND_DATABASE_URI instead for real configuration
|
# this is only used in CI. use SPIFFWORKFLOW_BACKEND_DATABASE_URI instead for real configuration
|
||||||
SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD = environ.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD", default=None)
|
SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD = environ.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD", default=None)
|
||||||
|
|
|
@ -56,9 +56,6 @@ from spiffworkflow_backend.services.error_handling_service import ErrorHandlingS
|
||||||
from spiffworkflow_backend.services.git_service import GitCommandError
|
from spiffworkflow_backend.services.git_service import GitCommandError
|
||||||
from spiffworkflow_backend.services.git_service import GitService
|
from spiffworkflow_backend.services.git_service import GitService
|
||||||
from spiffworkflow_backend.services.message_service import MessageService
|
from spiffworkflow_backend.services.message_service import MessageService
|
||||||
from spiffworkflow_backend.services.process_instance_lock_service import (
|
|
||||||
ProcessInstanceLockService,
|
|
||||||
)
|
|
||||||
from spiffworkflow_backend.services.process_instance_processor import (
|
from spiffworkflow_backend.services.process_instance_processor import (
|
||||||
ProcessInstanceProcessor,
|
ProcessInstanceProcessor,
|
||||||
)
|
)
|
||||||
|
@ -105,7 +102,6 @@ def process_instance_create(
|
||||||
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
|
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
|
||||||
process_model_identifier, g.user
|
process_model_identifier, g.user
|
||||||
)
|
)
|
||||||
ProcessInstanceQueueService.enqueue(process_instance)
|
|
||||||
return Response(
|
return Response(
|
||||||
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
|
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
|
||||||
status=201,
|
status=201,
|
||||||
|
@ -131,7 +127,6 @@ def process_instance_run(
|
||||||
|
|
||||||
if do_engine_steps:
|
if do_engine_steps:
|
||||||
try:
|
try:
|
||||||
processor.lock_process_instance("Web")
|
|
||||||
processor.do_engine_steps(save=True)
|
processor.do_engine_steps(save=True)
|
||||||
except (
|
except (
|
||||||
ApiError,
|
ApiError,
|
||||||
|
@ -150,9 +145,6 @@ def process_instance_run(
|
||||||
status_code=400,
|
status_code=400,
|
||||||
task=task,
|
task=task,
|
||||||
) from e
|
) from e
|
||||||
finally:
|
|
||||||
if ProcessInstanceLockService.has_lock(process_instance.id):
|
|
||||||
processor.unlock_process_instance("Web")
|
|
||||||
|
|
||||||
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
|
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
|
||||||
MessageService.correlate_all_message_instances()
|
MessageService.correlate_all_message_instances()
|
||||||
|
@ -173,14 +165,11 @@ def process_instance_terminate(
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
processor.lock_process_instance("Web")
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
processor.terminate()
|
processor.terminate()
|
||||||
except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e:
|
except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e:
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
ErrorHandlingService().handle_error(processor, e)
|
||||||
raise e
|
raise e
|
||||||
finally:
|
|
||||||
if ProcessInstanceLockService.has_lock(process_instance.id):
|
|
||||||
processor.unlock_process_instance("Web")
|
|
||||||
|
|
||||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||||
|
|
||||||
|
@ -194,14 +183,11 @@ def process_instance_suspend(
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
processor.lock_process_instance("Web")
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
processor.suspend()
|
processor.suspend()
|
||||||
except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e:
|
except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e:
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
ErrorHandlingService().handle_error(processor, e)
|
||||||
raise e
|
raise e
|
||||||
finally:
|
|
||||||
if ProcessInstanceLockService.has_lock(process_instance.id):
|
|
||||||
processor.unlock_process_instance("Web")
|
|
||||||
|
|
||||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||||
|
|
||||||
|
@ -215,14 +201,11 @@ def process_instance_resume(
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
processor.lock_process_instance("Web")
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
processor.resume()
|
processor.resume()
|
||||||
except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e:
|
except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e:
|
||||||
ErrorHandlingService().handle_error(processor, e)
|
ErrorHandlingService().handle_error(processor, e)
|
||||||
raise e
|
raise e
|
||||||
finally:
|
|
||||||
if ProcessInstanceLockService.has_lock(process_instance.id):
|
|
||||||
processor.unlock_process_instance("Web")
|
|
||||||
|
|
||||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,9 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||||
from spiffworkflow_backend.services.process_instance_processor import (
|
from spiffworkflow_backend.services.process_instance_processor import (
|
||||||
ProcessInstanceProcessor,
|
ProcessInstanceProcessor,
|
||||||
)
|
)
|
||||||
|
from spiffworkflow_backend.services.process_instance_queue_service import (
|
||||||
|
ProcessInstanceQueueService,
|
||||||
|
)
|
||||||
from spiffworkflow_backend.services.process_instance_service import (
|
from spiffworkflow_backend.services.process_instance_service import (
|
||||||
ProcessInstanceService,
|
ProcessInstanceService,
|
||||||
)
|
)
|
||||||
|
@ -426,13 +429,8 @@ def task_submit_shared(
|
||||||
only_tasks_that_can_be_completed=True,
|
only_tasks_that_can_be_completed=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
retry_times = current_app.config["SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES"]
|
|
||||||
retry_interval_in_seconds = current_app.config[
|
|
||||||
"SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS"
|
|
||||||
]
|
|
||||||
|
|
||||||
with sentry_sdk.start_span(op="task", description="complete_form_task"):
|
with sentry_sdk.start_span(op="task", description="complete_form_task"):
|
||||||
processor.lock_process_instance("Web", retry_times, retry_interval_in_seconds)
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
ProcessInstanceService.complete_form_task(
|
ProcessInstanceService.complete_form_task(
|
||||||
processor=processor,
|
processor=processor,
|
||||||
spiff_task=spiff_task,
|
spiff_task=spiff_task,
|
||||||
|
@ -440,7 +438,6 @@ def task_submit_shared(
|
||||||
user=g.user,
|
user=g.user,
|
||||||
human_task=human_task,
|
human_task=human_task,
|
||||||
)
|
)
|
||||||
processor.unlock_process_instance("Web")
|
|
||||||
|
|
||||||
# If we need to update all tasks, then get the next ready task and if it a multi-instance with the same
|
# If we need to update all tasks, then get the next ready task and if it a multi-instance with the same
|
||||||
# task spec, complete that form as well.
|
# task spec, complete that form as well.
|
||||||
|
|
|
@ -14,5 +14,5 @@ def safe_assertion(condition: bool) -> Generator[bool, None, None]:
|
||||||
if not condition:
|
if not condition:
|
||||||
sentry_sdk.capture_exception(e)
|
sentry_sdk.capture_exception(e)
|
||||||
current_app.logger.exception(e)
|
current_app.logger.exception(e)
|
||||||
if current_app.config["ENV_IDENTIFIER"] == "local_development":
|
if current_app.config["ENV_IDENTIFIER"] in ["local_development", "unit_testing"]:
|
||||||
raise e
|
raise e
|
||||||
|
|
|
@ -89,7 +89,6 @@ from spiffworkflow_backend.models.user import UserModel
|
||||||
from spiffworkflow_backend.scripts.script import Script
|
from spiffworkflow_backend.scripts.script import Script
|
||||||
from spiffworkflow_backend.services.custom_parser import MyCustomParser
|
from spiffworkflow_backend.services.custom_parser import MyCustomParser
|
||||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
|
|
||||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
|
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
|
||||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||||
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
|
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
|
||||||
|
@ -1544,29 +1543,6 @@ class ProcessInstanceProcessor:
|
||||||
# current_app.logger.debug(f"the_status: {the_status} for instance {self.process_instance_model.id}")
|
# current_app.logger.debug(f"the_status: {the_status} for instance {self.process_instance_model.id}")
|
||||||
return the_status
|
return the_status
|
||||||
|
|
||||||
# TODO: replace with implicit/more granular locking in workflow execution service
|
|
||||||
# TODO: remove the retry logic once all user_input_required's don't need to be locked to check timers
|
|
||||||
def lock_process_instance(
|
|
||||||
self, lock_prefix: str, retry_count: int = 0, retry_interval_in_seconds: int = 0
|
|
||||||
) -> None:
|
|
||||||
try:
|
|
||||||
ProcessInstanceQueueService.dequeue(self.process_instance_model)
|
|
||||||
except ProcessInstanceIsAlreadyLockedError as e:
|
|
||||||
if retry_count > 0:
|
|
||||||
current_app.logger.info(
|
|
||||||
f"process_instance_id {self.process_instance_model.id} is locked. "
|
|
||||||
f"will retry {retry_count} times with delay of {retry_interval_in_seconds}."
|
|
||||||
)
|
|
||||||
if retry_interval_in_seconds > 0:
|
|
||||||
time.sleep(retry_interval_in_seconds)
|
|
||||||
self.lock_process_instance(lock_prefix, retry_count - 1, retry_interval_in_seconds)
|
|
||||||
else:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
# TODO: replace with implicit/more granular locking in workflow execution service
|
|
||||||
def unlock_process_instance(self, lock_prefix: str) -> None:
|
|
||||||
ProcessInstanceQueueService.enqueue(self.process_instance_model)
|
|
||||||
|
|
||||||
def process_bpmn_messages(self) -> None:
|
def process_bpmn_messages(self) -> None:
|
||||||
"""Process_bpmn_messages."""
|
"""Process_bpmn_messages."""
|
||||||
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
|
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
|
||||||
|
@ -1622,6 +1598,18 @@ class ProcessInstanceProcessor:
|
||||||
exit_at: None = None,
|
exit_at: None = None,
|
||||||
save: bool = False,
|
save: bool = False,
|
||||||
execution_strategy_name: Optional[str] = None,
|
execution_strategy_name: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
with ProcessInstanceQueueService.dequeued(self.process_instance_model):
|
||||||
|
# TODO: ideally we just lock in the execution service, but not sure
|
||||||
|
# about _add_bpmn_process_definitions and if that needs to happen in
|
||||||
|
# the same lock like it does on main
|
||||||
|
self._do_engine_steps(exit_at, save, execution_strategy_name)
|
||||||
|
|
||||||
|
def _do_engine_steps(
|
||||||
|
self,
|
||||||
|
exit_at: None = None,
|
||||||
|
save: bool = False,
|
||||||
|
execution_strategy_name: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._add_bpmn_process_definitions()
|
self._add_bpmn_process_definitions()
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
|
import contextlib
|
||||||
import time
|
import time
|
||||||
|
from typing import Generator
|
||||||
from typing import List
|
from typing import List
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from flask import current_app
|
|
||||||
|
|
||||||
from spiffworkflow_backend.models.db import db
|
from spiffworkflow_backend.models.db import db
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||||
|
@ -26,28 +26,32 @@ class ProcessInstanceIsAlreadyLockedError(Exception):
|
||||||
class ProcessInstanceQueueService:
|
class ProcessInstanceQueueService:
|
||||||
"""TODO: comment."""
|
"""TODO: comment."""
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def enqueue(process_instance: ProcessInstanceModel) -> None:
|
def _configure_and_save_queue_entry(
|
||||||
queue_item = ProcessInstanceLockService.try_unlock(process_instance.id)
|
cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel
|
||||||
|
) -> None:
|
||||||
if queue_item is None:
|
|
||||||
queue_item = ProcessInstanceQueueModel(process_instance_id=process_instance.id)
|
|
||||||
|
|
||||||
# TODO: configurable params (priority/run_at)
|
# TODO: configurable params (priority/run_at)
|
||||||
queue_item.run_at_in_seconds = round(time.time())
|
queue_entry.run_at_in_seconds = round(time.time())
|
||||||
queue_item.priority = 2
|
queue_entry.priority = 2
|
||||||
queue_item.status = process_instance.status
|
queue_entry.status = process_instance.status
|
||||||
queue_item.locked_by = None
|
queue_entry.locked_by = None
|
||||||
queue_item.locked_at_in_seconds = None
|
queue_entry.locked_at_in_seconds = None
|
||||||
|
|
||||||
db.session.add(queue_item)
|
db.session.add(queue_entry)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def dequeue(process_instance: ProcessInstanceModel) -> None:
|
def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel) -> None:
|
||||||
if ProcessInstanceLockService.has_lock(process_instance.id):
|
queue_entry = ProcessInstanceQueueModel(process_instance_id=process_instance.id)
|
||||||
return
|
cls._configure_and_save_queue_entry(process_instance, queue_entry)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _enqueue(cls, process_instance: ProcessInstanceModel) -> None:
|
||||||
|
queue_entry = ProcessInstanceLockService.unlock(process_instance.id)
|
||||||
|
cls._configure_and_save_queue_entry(process_instance, queue_entry)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _dequeue(cls, process_instance: ProcessInstanceModel) -> None:
|
||||||
locked_by = ProcessInstanceLockService.locked_by()
|
locked_by = ProcessInstanceLockService.locked_by()
|
||||||
|
|
||||||
db.session.query(ProcessInstanceQueueModel).filter(
|
db.session.query(ProcessInstanceQueueModel).filter(
|
||||||
|
@ -82,6 +86,18 @@ class ProcessInstanceQueueService:
|
||||||
|
|
||||||
ProcessInstanceLockService.lock(process_instance.id, queue_entry)
|
ProcessInstanceLockService.lock(process_instance.id, queue_entry)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def dequeued(cls, process_instance: ProcessInstanceModel) -> Generator[None, None, None]:
|
||||||
|
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
try:
|
||||||
|
if not reentering_lock:
|
||||||
|
cls._dequeue(process_instance)
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
if not reentering_lock:
|
||||||
|
cls._enqueue(process_instance)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def entries_with_status(
|
def entries_with_status(
|
||||||
cls,
|
cls,
|
||||||
|
@ -105,31 +121,3 @@ class ProcessInstanceQueueService:
|
||||||
queue_entries = cls.entries_with_status(status_value, None)
|
queue_entries = cls.entries_with_status(status_value, None)
|
||||||
ids_with_status = [entry.process_instance_id for entry in queue_entries]
|
ids_with_status = [entry.process_instance_id for entry in queue_entries]
|
||||||
return ids_with_status
|
return ids_with_status
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def dequeue_many(
|
|
||||||
cls,
|
|
||||||
status_value: str = ProcessInstanceStatus.waiting.value,
|
|
||||||
) -> List[int]:
|
|
||||||
locked_by = ProcessInstanceLockService.locked_by()
|
|
||||||
|
|
||||||
# TODO: configurable params (priority/run_at/limit)
|
|
||||||
db.session.query(ProcessInstanceQueueModel).filter(
|
|
||||||
ProcessInstanceQueueModel.status == status_value,
|
|
||||||
ProcessInstanceQueueModel.locked_by.is_(None), # type: ignore
|
|
||||||
).update(
|
|
||||||
{
|
|
||||||
"locked_by": locked_by,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
queue_entries = cls.entries_with_status(status_value, locked_by)
|
|
||||||
|
|
||||||
locked_ids = ProcessInstanceLockService.lock_many(queue_entries)
|
|
||||||
|
|
||||||
if len(locked_ids) > 0:
|
|
||||||
current_app.logger.info(f"{locked_by} dequeued_many: {locked_ids}")
|
|
||||||
|
|
||||||
return locked_ids
|
|
||||||
|
|
|
@ -70,6 +70,7 @@ class ProcessInstanceService:
|
||||||
)
|
)
|
||||||
db.session.add(process_instance_model)
|
db.session.add(process_instance_model)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model)
|
||||||
return process_instance_model
|
return process_instance_model
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -111,9 +112,7 @@ class ProcessInstanceService:
|
||||||
.filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
|
.filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
|
||||||
.all()
|
.all()
|
||||||
)
|
)
|
||||||
process_instance_lock_prefix = "Background"
|
|
||||||
for process_instance in records:
|
for process_instance in records:
|
||||||
locked = False
|
|
||||||
processor = None
|
processor = None
|
||||||
try:
|
try:
|
||||||
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
||||||
|
@ -122,8 +121,6 @@ class ProcessInstanceService:
|
||||||
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
|
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
processor.lock_process_instance(process_instance_lock_prefix)
|
|
||||||
locked = True
|
|
||||||
db.session.refresh(process_instance)
|
db.session.refresh(process_instance)
|
||||||
if process_instance.status == status_value:
|
if process_instance.status == status_value:
|
||||||
execution_strategy_name = current_app.config[
|
execution_strategy_name = current_app.config[
|
||||||
|
@ -142,9 +139,6 @@ class ProcessInstanceService:
|
||||||
+ f"({process_instance.process_model_identifier}). {str(e)}"
|
+ f"({process_instance.process_model_identifier}). {str(e)}"
|
||||||
)
|
)
|
||||||
current_app.logger.error(error_message)
|
current_app.logger.error(error_message)
|
||||||
finally:
|
|
||||||
if locked and processor:
|
|
||||||
processor.unlock_process_instance(process_instance_lock_prefix)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def processor_to_process_instance_api(
|
def processor_to_process_instance_api(
|
||||||
|
|
|
@ -304,7 +304,7 @@ class BaseTest:
|
||||||
db.session.add(process_instance)
|
db.session.add(process_instance)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
ProcessInstanceQueueService.enqueue(process_instance)
|
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance)
|
||||||
|
|
||||||
return process_instance
|
return process_instance
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,6 @@ from spiffworkflow_backend.services.authorization_service import (
|
||||||
from spiffworkflow_backend.services.process_instance_processor import (
|
from spiffworkflow_backend.services.process_instance_processor import (
|
||||||
ProcessInstanceProcessor,
|
ProcessInstanceProcessor,
|
||||||
)
|
)
|
||||||
from spiffworkflow_backend.services.process_instance_queue_service import (
|
|
||||||
ProcessInstanceIsAlreadyLockedError,
|
|
||||||
)
|
|
||||||
from spiffworkflow_backend.services.process_instance_service import (
|
from spiffworkflow_backend.services.process_instance_service import (
|
||||||
ProcessInstanceService,
|
ProcessInstanceService,
|
||||||
)
|
)
|
||||||
|
@ -632,43 +629,6 @@ class TestProcessInstanceProcessor(BaseTest):
|
||||||
assert len(process_instance.active_human_tasks) == 1
|
assert len(process_instance.active_human_tasks) == 1
|
||||||
assert initial_human_task_id == process_instance.active_human_tasks[0].id
|
assert initial_human_task_id == process_instance.active_human_tasks[0].id
|
||||||
|
|
||||||
# TODO: port this test to queue_service test
|
|
||||||
def xxx_test_it_can_lock_and_unlock_a_process_instance(
|
|
||||||
self,
|
|
||||||
app: Flask,
|
|
||||||
client: FlaskClient,
|
|
||||||
with_db_and_bpmn_file_cleanup: None,
|
|
||||||
) -> None:
|
|
||||||
initiator_user = self.find_or_create_user("initiator_user")
|
|
||||||
process_model = load_test_spec(
|
|
||||||
process_model_id="test_group/model_with_lanes",
|
|
||||||
bpmn_file_name="lanes_with_owner_dict.bpmn",
|
|
||||||
process_model_source_directory="model_with_lanes",
|
|
||||||
)
|
|
||||||
process_instance = self.create_process_instance_from_process_model(
|
|
||||||
process_model=process_model, user=initiator_user
|
|
||||||
)
|
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
|
||||||
assert process_instance.locked_by is None
|
|
||||||
assert process_instance.locked_at_in_seconds is None
|
|
||||||
processor.lock_process_instance("TEST")
|
|
||||||
|
|
||||||
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
|
|
||||||
assert process_instance.locked_by is not None
|
|
||||||
assert process_instance.locked_at_in_seconds is not None
|
|
||||||
|
|
||||||
with pytest.raises(ProcessInstanceIsAlreadyLockedError):
|
|
||||||
processor.lock_process_instance("TEST")
|
|
||||||
|
|
||||||
# with pytest.raises(ProcessInstanceLockedBySomethingElseError):
|
|
||||||
# processor.unlock_process_instance("TEST2")
|
|
||||||
|
|
||||||
processor.unlock_process_instance("TEST")
|
|
||||||
|
|
||||||
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
|
|
||||||
assert process_instance.locked_by is None
|
|
||||||
assert process_instance.locked_at_in_seconds is None
|
|
||||||
|
|
||||||
def test_it_can_loopback_to_previous_bpmn_task_with_gateway(
|
def test_it_can_loopback_to_previous_bpmn_task_with_gateway(
|
||||||
self,
|
self,
|
||||||
app: Flask,
|
app: Flask,
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
"""Test_process_instance_queue_service."""
|
||||||
|
from contextlib import suppress
|
||||||
|
|
||||||
|
from flask.app import Flask
|
||||||
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||||
|
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||||
|
|
||||||
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
|
from spiffworkflow_backend.services.process_instance_lock_service import (
|
||||||
|
ProcessInstanceLockService,
|
||||||
|
)
|
||||||
|
from spiffworkflow_backend.services.process_instance_queue_service import (
|
||||||
|
ProcessInstanceQueueService,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestProcessInstanceQueueService(BaseTest):
|
||||||
|
"""TestProcessInstanceQueueService."""
|
||||||
|
|
||||||
|
def _create_process_instance(self) -> ProcessInstanceModel:
|
||||||
|
initiator_user = self.find_or_create_user("initiator_user")
|
||||||
|
process_model = load_test_spec(
|
||||||
|
process_model_id="test_group/model_with_lanes",
|
||||||
|
bpmn_file_name="lanes.bpmn",
|
||||||
|
process_model_source_directory="model_with_lanes",
|
||||||
|
)
|
||||||
|
process_instance = self.create_process_instance_from_process_model(
|
||||||
|
process_model=process_model, user=initiator_user
|
||||||
|
)
|
||||||
|
return process_instance
|
||||||
|
|
||||||
|
def test_newly_created_process_instances_are_not_locked_when_added_to_the_queue(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_instance = self._create_process_instance()
|
||||||
|
assert not ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None)
|
||||||
|
check_passed = False
|
||||||
|
for entry in queue_entries:
|
||||||
|
if entry.process_instance_id == process_instance.id:
|
||||||
|
assert entry.locked_by is None
|
||||||
|
check_passed = True
|
||||||
|
break
|
||||||
|
assert check_passed
|
||||||
|
|
||||||
|
def test_peek_many_can_see_queue_entries_with_a_given_status(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_instance = self._create_process_instance()
|
||||||
|
queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started")
|
||||||
|
assert process_instance.id in queue_entry_ids
|
||||||
|
|
||||||
|
def test_can_run_some_code_with_a_dequeued_process_instance(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_instance = self._create_process_instance()
|
||||||
|
check_passed = False
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
check_passed = True
|
||||||
|
assert check_passed
|
||||||
|
|
||||||
|
def test_holds_a_lock_for_dequeued_process_instance(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_instance = self._create_process_instance()
|
||||||
|
assert not ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
assert not ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
|
||||||
|
def test_unlocks_if_an_exception_is_thrown_with_a__dequeued_process_instance(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_instance = self._create_process_instance()
|
||||||
|
|
||||||
|
with suppress(Exception):
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
raise Exception("just testing")
|
||||||
|
|
||||||
|
assert not ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
|
||||||
|
def test_can_call_dequeued_mulitple_times(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_instance = self._create_process_instance()
|
||||||
|
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
|
||||||
|
def test_can_nest_multiple_dequeued_calls(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_instance = self._create_process_instance()
|
||||||
|
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
assert ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
|
||||||
|
assert not ProcessInstanceLockService.has_lock(process_instance.id)
|
Loading…
Reference in New Issue