Merge pull request #95 from sartography/feature/background-process-waiting-tasks

Feature/background process waiting tasks
This commit is contained in:
Kevin Burnett 2022-09-20 19:34:20 +00:00 committed by GitHub
commit adbc521cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 66 additions and 25 deletions

View File

@ -29,10 +29,10 @@ if [[ "${1:-}" == "clean" ]]; then
if [[ "${SPIFF_DATABASE_TYPE:-}" == "postgres" ]]; then
if ! docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_testing -c "select 1"; then
docker run --name postgres-spiff -p 5432:5432 -e POSTGRES_PASSWORD=spiffworkflow_backend -e POSTGRES_USER=spiffworkflow_backend -e POSTGRES_DB=spiffworkflow_backend_testing -d postgres
sleep 4 # classy
# create other db
fi
if ! docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_development -c "select 1"; then
# create other db. spiffworkflow_backend_testing came with the docker run.
docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_testing -c "create database spiffworkflow_backend_development;"
fi
fi

View File

@ -17,7 +17,9 @@ from spiffworkflow_backend.config import setup_config
from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint
from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
from spiffworkflow_backend.services.message_service import MessageServiceWithAppContext
from spiffworkflow_backend.services.background_processing_service import (
BackgroundProcessingService,
)
class MyJSONEncoder(flask.json.JSONEncoder):
@ -34,10 +36,15 @@ def start_scheduler(app: flask.app.Flask) -> None:
"""Start_scheduler."""
scheduler = BackgroundScheduler()
scheduler.add_job(
MessageServiceWithAppContext(app).process_message_instances_with_app_context,
BackgroundProcessingService(app).process_message_instances_with_app_context,
"interval",
seconds=10,
)
scheduler.add_job(
BackgroundProcessingService(app).run,
"interval",
seconds=30,
)
scheduler.start()

View File

@ -79,6 +79,7 @@ class ProcessInstanceStatus(SpiffEnum):
faulted = "faulted"
suspended = "suspended"
terminated = "terminated"
erroring = "erroring"
class ProcessInstanceModel(SpiffworkflowBaseDBModel):

View File

@ -0,0 +1,25 @@
"""Background_processing_service."""
import flask
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
class BackgroundProcessingService:
"""Used to facilitate doing work outside of an HTTP request/response."""
def __init__(self, app: flask.app.Flask):
"""__init__."""
self.app = app
def run(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
ProcessInstanceService.do_waiting()
def process_message_instances_with_app_context(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
MessageService.process_message_instances()

View File

@ -2,7 +2,6 @@
from typing import Any
from typing import Optional
import flask
from flask_bpmn.models.db import db
from sqlalchemy import and_
from sqlalchemy import or_
@ -26,23 +25,6 @@ from spiffworkflow_backend.services.process_instance_service import (
)
class MessageServiceWithAppContext:
"""Wrapper for Message Service.
This wrappers is to facilitate running the MessageService from the scheduler
since we need to specify the app context then.
"""
def __init__(self, app: flask.app.Flask):
"""__init__."""
self.app = app
def process_message_instances_with_app_context(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
MessageService.process_message_instances()
class MessageServiceError(Exception):
"""MessageServiceError."""

View File

@ -58,6 +58,30 @@ class ProcessInstanceService:
db.session.commit()
return process_instance_model
@staticmethod
def do_waiting() -> None:
"""Do_waiting."""
records = (
db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.status == ProcessInstanceStatus.waiting.value)
.all()
)
for process_instance in records:
try:
current_app.logger.info(
f"Processing process_instance {process_instance.id}"
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
except Exception:
db.session.rollback() # in case the above left the database with a bad transaction
process_instance.status = ProcessInstanceStatus.erroring.value
db.session.add(process_instance)
db.session.commit()
error_message = f"Error running waiting task for process_instance {process_instance.id}"
"({process_instance.process_model_identifier}). {str(e)}"
current_app.logger.error(error_message)
@staticmethod
def processor_to_process_instance_api(
processor: ProcessInstanceProcessor, next_task: None = None

View File

@ -188,8 +188,10 @@ class TestMessageService(BaseTest):
process_instance_result = ProcessInstanceModel.query.all()
assert len(process_instance_result) == 3
process_instance_receiver_one = process_instance_result[1]
process_instance_receiver_two = process_instance_result[2]
process_instance_receiver_one = ProcessInstanceModel.query.filter_by(process_model_identifier='message_receiver_one').first()
assert process_instance_receiver_one is not None
process_instance_receiver_two = ProcessInstanceModel.query.filter_by(process_model_identifier='message_receiver_two').first()
assert process_instance_receiver_two is not None
# just make sure it's a different process instance
assert (