diff --git a/bin/recreate_db b/bin/recreate_db index 5a1aa3f7..ecb8fbfc 100755 --- a/bin/recreate_db +++ b/bin/recreate_db @@ -29,10 +29,10 @@ if [[ "${1:-}" == "clean" ]]; then if [[ "${SPIFF_DATABASE_TYPE:-}" == "postgres" ]]; then if ! docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_testing -c "select 1"; then docker run --name postgres-spiff -p 5432:5432 -e POSTGRES_PASSWORD=spiffworkflow_backend -e POSTGRES_USER=spiffworkflow_backend -e POSTGRES_DB=spiffworkflow_backend_testing -d postgres - sleep 4 # classy - - # create other db + fi + if ! docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_development -c "select 1"; then + # create other db. spiffworkflow_backend_testing came with the docker run. docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_testing -c "create database spiffworkflow_backend_development;" fi fi diff --git a/src/spiffworkflow_backend/__init__.py b/src/spiffworkflow_backend/__init__.py index 14c8e49f..5780e2f7 100644 --- a/src/spiffworkflow_backend/__init__.py +++ b/src/spiffworkflow_backend/__init__.py @@ -17,7 +17,9 @@ from spiffworkflow_backend.config import setup_config from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint from spiffworkflow_backend.routes.user_blueprint import user_blueprint -from spiffworkflow_backend.services.message_service import MessageServiceWithAppContext +from spiffworkflow_backend.services.background_processing_service import ( + BackgroundProcessingService, +) class MyJSONEncoder(flask.json.JSONEncoder): @@ -34,10 +36,15 @@ def start_scheduler(app: flask.app.Flask) -> None: """Start_scheduler.""" scheduler = BackgroundScheduler() scheduler.add_job( - MessageServiceWithAppContext(app).process_message_instances_with_app_context, + BackgroundProcessingService(app).process_message_instances_with_app_context, "interval", seconds=10, ) + scheduler.add_job( + BackgroundProcessingService(app).run, + "interval", + seconds=30, + ) scheduler.start() diff --git a/src/spiffworkflow_backend/models/process_instance.py b/src/spiffworkflow_backend/models/process_instance.py index 5ecc770f..ea9fa615 100644 --- a/src/spiffworkflow_backend/models/process_instance.py +++ b/src/spiffworkflow_backend/models/process_instance.py @@ -79,6 +79,7 @@ class ProcessInstanceStatus(SpiffEnum): faulted = "faulted" suspended = "suspended" terminated = "terminated" + erroring = "erroring" class ProcessInstanceModel(SpiffworkflowBaseDBModel): diff --git a/src/spiffworkflow_backend/services/background_processing_service.py b/src/spiffworkflow_backend/services/background_processing_service.py new file mode 100644 index 00000000..08a2b02d --- /dev/null +++ b/src/spiffworkflow_backend/services/background_processing_service.py @@ -0,0 +1,25 @@ +"""Background_processing_service.""" +import flask + +from spiffworkflow_backend.services.message_service import MessageService +from spiffworkflow_backend.services.process_instance_service import ( + ProcessInstanceService, +) + + +class BackgroundProcessingService: + """Used to facilitate doing work outside of an HTTP request/response.""" + + def __init__(self, app: flask.app.Flask): + """__init__.""" + self.app = app + + def run(self) -> None: + """Since this runs in a scheduler, we need to specify the app context as well.""" + with self.app.app_context(): + ProcessInstanceService.do_waiting() + + def process_message_instances_with_app_context(self) -> None: + """Since this runs in a scheduler, we need to specify the app context as well.""" + with self.app.app_context(): + MessageService.process_message_instances() diff --git a/src/spiffworkflow_backend/services/message_service.py b/src/spiffworkflow_backend/services/message_service.py index d1a1f8c1..822404de 100644 --- a/src/spiffworkflow_backend/services/message_service.py +++ b/src/spiffworkflow_backend/services/message_service.py @@ -2,7 +2,6 @@ from typing import Any from typing import Optional -import flask from flask_bpmn.models.db import db from sqlalchemy import and_ from sqlalchemy import or_ @@ -26,23 +25,6 @@ from spiffworkflow_backend.services.process_instance_service import ( ) -class MessageServiceWithAppContext: - """Wrapper for Message Service. - - This wrappers is to facilitate running the MessageService from the scheduler - since we need to specify the app context then. - """ - - def __init__(self, app: flask.app.Flask): - """__init__.""" - self.app = app - - def process_message_instances_with_app_context(self) -> None: - """Since this runs in a scheduler, we need to specify the app context as well.""" - with self.app.app_context(): - MessageService.process_message_instances() - - class MessageServiceError(Exception): """MessageServiceError.""" diff --git a/src/spiffworkflow_backend/services/process_instance_service.py b/src/spiffworkflow_backend/services/process_instance_service.py index cafe9776..6db9312b 100644 --- a/src/spiffworkflow_backend/services/process_instance_service.py +++ b/src/spiffworkflow_backend/services/process_instance_service.py @@ -58,6 +58,30 @@ class ProcessInstanceService: db.session.commit() return process_instance_model + @staticmethod + def do_waiting() -> None: + """Do_waiting.""" + records = ( + db.session.query(ProcessInstanceModel) + .filter(ProcessInstanceModel.status == ProcessInstanceStatus.waiting.value) + .all() + ) + for process_instance in records: + try: + current_app.logger.info( + f"Processing process_instance {process_instance.id}" + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + except Exception: + db.session.rollback() # in case the above left the database with a bad transaction + process_instance.status = ProcessInstanceStatus.erroring.value + db.session.add(process_instance) + db.session.commit() + error_message = f"Error running waiting task for process_instance {process_instance.id}" + "({process_instance.process_model_identifier}). {str(e)}" + current_app.logger.error(error_message) + @staticmethod def processor_to_process_instance_api( processor: ProcessInstanceProcessor, next_task: None = None diff --git a/tests/spiffworkflow_backend/unit/test_message_service.py b/tests/spiffworkflow_backend/unit/test_message_service.py index 3b796120..fe72c926 100644 --- a/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/tests/spiffworkflow_backend/unit/test_message_service.py @@ -188,8 +188,10 @@ class TestMessageService(BaseTest): process_instance_result = ProcessInstanceModel.query.all() assert len(process_instance_result) == 3 - process_instance_receiver_one = process_instance_result[1] - process_instance_receiver_two = process_instance_result[2] + process_instance_receiver_one = ProcessInstanceModel.query.filter_by(process_model_identifier='message_receiver_one').first() + assert process_instance_receiver_one is not None + process_instance_receiver_two = ProcessInstanceModel.query.filter_by(process_model_identifier='message_receiver_two').first() + assert process_instance_receiver_two is not None # just make sure it's a different process instance assert (