diff --git a/bin/boot_server_in_docker b/bin/boot_server_in_docker index bd039722..ed52cac0 100755 --- a/bin/boot_server_in_docker +++ b/bin/boot_server_in_docker @@ -32,6 +32,7 @@ if [[ "${APPLICATION_ROOT:-}" != "/" ]]; then fi export IS_GUNICORN="true" +export PROCESS_WAITING_MESSAGES="true" # THIS MUST BE THE LAST COMMAND! exec poetry run gunicorn ${additional_args} --bind "0.0.0.0:$port" --workers=3 --timeout 90 --capture-output --access-logfile '-' --log-level debug wsgi:app diff --git a/bin/run_server_locally b/bin/run_server_locally index 1d9af679..63c273c1 100755 --- a/bin/run_server_locally +++ b/bin/run_server_locally @@ -22,5 +22,5 @@ export APPLICATION_ROOT="/" if [[ -n "${SPIFFWORKFLOW_BACKEND_LOAD_FIXTURE_DATA:-}" ]]; then ./bin/boot_server_in_docker else - FLASK_APP=src/spiffworkflow_backend poetry run flask run -p 7000 + PROCESS_WAITING_MESSAGES="true" FLASK_APP=src/spiffworkflow_backend poetry run flask run -p 7000 fi diff --git a/migrations/versions/a5a93fe63899_.py b/migrations/versions/c7735c07773c_.py similarity index 93% rename from migrations/versions/a5a93fe63899_.py rename to migrations/versions/c7735c07773c_.py index 1f64e5c4..050b5032 100644 --- a/migrations/versions/a5a93fe63899_.py +++ b/migrations/versions/c7735c07773c_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: a5a93fe63899 +Revision ID: c7735c07773c Revises: -Create Date: 2022-07-27 08:51:44.791339 +Create Date: 2022-08-01 14:22:21.009841 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = 'a5a93fe63899' +revision = 'c7735c07773c' down_revision = None branch_labels = None depends_on = None @@ -31,6 +31,12 @@ def upgrade(): sa.Column('new_name_two', sa.String(length=255), nullable=True), sa.PrimaryKeyConstraint('id') ) + op.create_table('message_model', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=50), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_message_model_name'), 'message_model', ['name'], unique=True) op.create_table('user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('username', sa.String(length=50), nullable=False), @@ -135,8 +141,9 @@ def upgrade(): sa.Column('id', sa.Integer(), nullable=False), sa.Column('process_instance_id', sa.Integer(), nullable=False), sa.Column('bpmn_element_id', sa.String(length=50), nullable=False), - sa.Column('correlation_name', sa.String(length=50), nullable=False), - sa.Column('correlation_value', sa.String(length=50), nullable=False), + sa.Column('messsage_type', sa.Enum('send', 'receive', name='messagetypes'), nullable=False), + sa.Column('message_model', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['message_model'], ['message_model.id'], ), sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), sa.PrimaryKeyConstraint('id') ) @@ -195,6 +202,8 @@ def downgrade(): op.drop_table('process_instance') op.drop_table('principal') op.drop_table('user') + op.drop_index(op.f('ix_message_model_name'), table_name='message_model') + op.drop_table('message_model') op.drop_table('group') op.drop_table('admin_session') # ### end Alembic commands ### diff --git a/poetry.lock b/poetry.lock index b5d60ac0..51e39662 100644 --- a/poetry.lock +++ b/poetry.lock @@ -43,6 +43,32 @@ python-versions = "*" [package.extras] dev = ["black", "coverage", "isort", "pre-commit", "pyenchant", "pylint"] +[[package]] +name = "apscheduler" +version = "3.9.1" +description = "In-process task scheduler with Cron-like capabilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" + +[package.dependencies] +pytz = "*" +six = ">=1.4.0" +tzlocal = ">=2.0,<3.0.0 || >=4.0.0" + +[package.extras] +asyncio = ["trollius"] +doc = ["sphinx", "sphinx-rtd-theme"] +gevent = ["gevent"] +mongodb = ["pymongo (>=3.0)"] +redis = ["redis (>=3.0)"] +rethinkdb = ["rethinkdb (>=2.4.0)"] +sqlalchemy = ["sqlalchemy (>=0.8)"] +testing = ["pytest", "pytest-cov", "pytest-tornado5", "mock", "pytest-asyncio (<0.6)", "pytest-asyncio"] +tornado = ["tornado (>=4.3)"] +twisted = ["twisted"] +zookeeper = ["kazoo"] + [[package]] name = "astroid" version = "2.11.6" @@ -2093,7 +2119,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "66503576ef158089a92526ed6982bc93481868a47fec14c47d1ce9a5bcc08979" +content-hash = "37bb9f834037be053e37ca34ba825e2be692554e4c668a4bae42560987d6e092" [metadata.files] alabaster = [ @@ -2112,6 +2138,7 @@ aniso8601 = [ {file = "aniso8601-9.0.1-py2.py3-none-any.whl", hash = "sha256:1d2b7ef82963909e93c4f24ce48d4de9e66009a21bf1c1e1c85bdd0812fe412f"}, {file = "aniso8601-9.0.1.tar.gz", hash = "sha256:72e3117667eedf66951bb2d93f4296a56b94b078a8a95905a052611fb3f1b973"}, ] +apscheduler = [] astroid = [ {file = "astroid-2.11.6-py3-none-any.whl", hash = "sha256:ba33a82a9a9c06a5ceed98180c5aab16e29c285b828d94696bf32d6015ea82a9"}, {file = "astroid-2.11.6.tar.gz", hash = "sha256:4f933d0bf5e408b03a6feb5d23793740c27e07340605f236496cd6ce552043d6"}, diff --git a/pyproject.toml b/pyproject.toml index 6cc94f6d..8342be7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ PyJWT = "^2.4.0" gunicorn = "^20.1.0" types-pytz = "^2022.1.1" python-keycloak = "^1.9.1" +APScheduler = "^3.9.1" [tool.poetry.dev-dependencies] diff --git a/src/spiffworkflow_backend/__init__.py b/src/spiffworkflow_backend/__init__.py index 45e4a066..a61c3084 100644 --- a/src/spiffworkflow_backend/__init__.py +++ b/src/spiffworkflow_backend/__init__.py @@ -5,6 +5,7 @@ from typing import Any import connexion # type: ignore import flask.app import flask.json +from apscheduler.schedulers.background import BackgroundScheduler from flask_bpmn.api.api_error import api_error_blueprint from flask_bpmn.models.db import db from flask_bpmn.models.db import migrate @@ -16,6 +17,7 @@ from spiffworkflow_backend.config import setup_config from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint from spiffworkflow_backend.routes.user_blueprint import user_blueprint +from spiffworkflow_backend.services.message_service import MessageServiceWithAppContext class MyJSONEncoder(flask.json.JSONEncoder): @@ -28,6 +30,12 @@ class MyJSONEncoder(flask.json.JSONEncoder): return super().default(obj) +# def process_queued_messages(app: flask.app.Flask): +# """Process_queued_messages.""" +# with app.app_context(): +# MessageService().process_queued_messages() + + def create_app() -> flask.app.Flask: """Create_app.""" # We need to create the sqlite database in a known location. @@ -71,4 +79,13 @@ def create_app() -> flask.app.Flask: app.json_encoder = MyJSONEncoder + if app.config["PROCESS_WAITING_MESSAGES"]: + scheduler = BackgroundScheduler() + scheduler.add_job( + MessageServiceWithAppContext(app).process_queued_messages_with_app_context, + "interval", + minutes=1, + ) + scheduler.start() + return app # type: ignore diff --git a/src/spiffworkflow_backend/config/default.py b/src/spiffworkflow_backend/config/default.py index 89f3aa72..c944baa9 100644 --- a/src/spiffworkflow_backend/config/default.py +++ b/src/spiffworkflow_backend/config/default.py @@ -7,3 +7,7 @@ CORS_DEFAULT = "*" CORS_ALLOW_ORIGINS = re.split( r",\s*", environ.get("CORS_ALLOW_ORIGINS", default=CORS_DEFAULT) ) + +PROCESS_WAITING_MESSAGES = ( + environ.get("PROCESS_WAITING_MESSAGES", default="false") == "true" +) diff --git a/src/spiffworkflow_backend/models/message_correlation.py b/src/spiffworkflow_backend/models/message_correlation.py new file mode 100644 index 00000000..9bddeb1c --- /dev/null +++ b/src/spiffworkflow_backend/models/message_correlation.py @@ -0,0 +1,25 @@ +"""Message_correlation.""" +from dataclasses import dataclass + +from flask_bpmn.models.db import db +from flask_bpmn.models.db import SpiffworkflowBaseDBModel +from sqlalchemy import ForeignKey + +from spiffworkflow_backend.models.message_instance import MessageInstanceModel + + +@dataclass +class MessageCorrelationModel(SpiffworkflowBaseDBModel): + """Message Correlations to relate queued messages together.""" + + __tablename__ = "message_correlation" + __table_args__ = ( + db.UniqueConstraint( + "message_id", "name", name="message_id_name_unique" + ), + ) + + id = db.Column(db.Integer, primary_key=True) + message_id = db.Column(ForeignKey(MessageInstanceModel.id), nullable=False, index=True) + name = db.Column(db.String(50), nullable=False, index=True) + value = db.Column(db.String(50), nullable=False, index=True) diff --git a/src/spiffworkflow_backend/models/queued_send_message.py b/src/spiffworkflow_backend/models/message_instance.py similarity index 59% rename from src/spiffworkflow_backend/models/queued_send_message.py rename to src/spiffworkflow_backend/models/message_instance.py index 13df3297..b4d10763 100644 --- a/src/spiffworkflow_backend/models/queued_send_message.py +++ b/src/spiffworkflow_backend/models/message_instance.py @@ -1,15 +1,25 @@ -"""Principal.""" +"""Message_instance.""" +import enum + from dataclasses import dataclass from flask_bpmn.models.db import db from flask_bpmn.models.db import SpiffworkflowBaseDBModel +from sqlalchemy import Enum from sqlalchemy import ForeignKey from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.message_model import MessageModel + + +class MessageTypes(enum.Enum): + """MessageTypes.""" + send = 1 + receive = 2 @dataclass -class QueuedSendMessageModel(SpiffworkflowBaseDBModel): +class MessageInstanceModel(SpiffworkflowBaseDBModel): """Messages from a process instance that are ready to send to a receiving task.""" __tablename__ = "queued_send_message" @@ -17,5 +27,5 @@ class QueuedSendMessageModel(SpiffworkflowBaseDBModel): id = db.Column(db.Integer, primary_key=True) process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore bpmn_element_id = db.Column(db.String(50), nullable=False) - correlation_name = db.Column(db.String(50), nullable=False) - correlation_value = db.Column(db.String(50), nullable=False) + messsage_type = db.Column(Enum(MessageTypes), nullable=False) + message_model = db.Column(ForeignKey(MessageModel.id), nullable=False) diff --git a/src/spiffworkflow_backend/models/message_model.py b/src/spiffworkflow_backend/models/message_model.py new file mode 100644 index 00000000..2780349e --- /dev/null +++ b/src/spiffworkflow_backend/models/message_model.py @@ -0,0 +1,13 @@ +"""Message_model.""" + +from flask_bpmn.models.db import db +from flask_bpmn.models.db import SpiffworkflowBaseDBModel + + +class MessageModel(SpiffworkflowBaseDBModel): + """MessageModel.""" + + __tablename__ = "message_model" + + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(50), unique=True, index=True) diff --git a/src/spiffworkflow_backend/models/queued_receive_message.py b/src/spiffworkflow_backend/models/queued_receive_message.py deleted file mode 100644 index cfbd3bf1..00000000 --- a/src/spiffworkflow_backend/models/queued_receive_message.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Principal.""" -from dataclasses import dataclass - -from flask_bpmn.models.db import db -from flask_bpmn.models.db import SpiffworkflowBaseDBModel -from sqlalchemy import ForeignKey - -from spiffworkflow_backend.models.process_instance import ProcessInstanceModel - - -@dataclass -class QueuedReceiveMessageModel(SpiffworkflowBaseDBModel): - """Messages from a process instance that are ready to receive a message from a task.""" - - __tablename__ = "queued_receive_message" - - id = db.Column(db.Integer, primary_key=True) - process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore - bpmn_element_id = db.Column(db.String(50), nullable=False) - correlation_name = db.Column(db.String(50), nullable=False) - correlation_value = db.Column(db.String(50), nullable=False) diff --git a/src/spiffworkflow_backend/services/message_service.py b/src/spiffworkflow_backend/services/message_service.py new file mode 100644 index 00000000..1e626428 --- /dev/null +++ b/src/spiffworkflow_backend/services/message_service.py @@ -0,0 +1,32 @@ +"""Message_service.""" + +import flask +from spiffworkflow_backend.models.user import UserModel + + +class MessageServiceWithAppContext: + """Wrapper for Message Service. + + This wrappers is to facilitate running the MessageService from the scheduler + since we need to specify the app context then. + """ + + def __init__(self, app: flask.app.Flask): + """__init__.""" + self.app = app + + def process_queued_messages_with_app_context(self): + """Since this runs in a scheduler, we need to specify the app context as well.""" + with self.app.app_context(): + MessageService().process_queued_messages() + + +class MessageService: + """MessageService.""" + + def process_queued_messages(self): + """Process_queued_messages.""" + user = UserModel.query.first() + print(f"user: {user}") + # MessageInstanceModel.query + # if diff --git a/src/spiffworkflow_backend/services/process_instance_processor.py b/src/spiffworkflow_backend/services/process_instance_processor.py index 8bcb5dbb..45edca16 100644 --- a/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/src/spiffworkflow_backend/services/process_instance_processor.py @@ -35,13 +35,12 @@ from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from spiffworkflow_backend.models.active_task import ActiveTaskModel from spiffworkflow_backend.models.file import File from spiffworkflow_backend.models.file import FileType +from spiffworkflow_backend.models.message_instance import MessageInstanceModel +from spiffworkflow_backend.models.message_instance import MessageModel from spiffworkflow_backend.models.principal import PrincipalModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_model import ProcessModelInfo -from spiffworkflow_backend.models.queued_receive_message import ( - QueuedReceiveMessageModel, -) from spiffworkflow_backend.models.task_event import TaskAction from spiffworkflow_backend.models.task_event import TaskEventModel from spiffworkflow_backend.models.user import UserModelSchema @@ -450,34 +449,48 @@ class ProcessInstanceProcessor: """Process_bpmn_events.""" if self.bpmn_process_instance.bpmn_events: for bpmn_event in self.bpmn_process_instance.bpmn_events: - # FIXME: pseudocode - not sure how to determine message types yet - if bpmn_event.event == "ReceivedEvent": - queued_message = QueuedReceiveMessageModel( - process_instance_id=self.process_instance_model.id, - bpmn_element_id=bpmn_event.task_name, + message_type = None + + # TODO: message: who knows the of the message model? + # will it be in the bpmn_event? + message_model = MessageModel.query.filter_by( + name=bpmn_event.message_name + ) + + if message_model is None: + raise ApiError( + "invalid_message_name", + f"Invalid message name: {bpmn_event.message_name}.", ) - db.session.add(queued_message) - db.session.commit() + + # TODO: message - not sure how to determine message types yet + if bpmn_event.event == "WaitEvent": # and waiting for message: + message_type = "receive" elif bpmn_event.event == "SendEvent": - queued_receive_message = ( - QueuedReceiveMessageModel.query.filter_by().first() + message_type = "send" + + if message_type is None: + raise ApiError( + "invalid_event_type", + f"Invalid event type for a message: {bpmn_event.event}.", ) - if queued_receive_message: - process_instance = ProcessInstanceModel.query.filter_by( - process_instance_id=queued_receive_message.process_instance_id - ).first() - if process_instance is None: - raise ApiError( - "process_instance_not_found", - f"The process instance - {queued_receive_message.process_instance_id} - for receiving event - {queued_receive_message.id} - cannot be found", - ) + + queued_message = MessageInstanceModel( + process_instance_id=self.process_instance_model.id, + bpmn_element_id=bpmn_event.task_name, + message_type=message_type, + message_model_id=message_model.id, + ) + db.session.add(queued_message) + db.session.commit() def do_engine_steps(self, exit_at: None = None) -> None: """Do_engine_steps.""" try: self.bpmn_process_instance.refresh_waiting_tasks() self.bpmn_process_instance.do_engine_steps(exit_at=exit_at) - self.process_bpmn_events() + # TODO: run this + # self.process_bpmn_events() except WorkflowTaskExecException as we: raise ApiError.from_workflow_exception("task_error", str(we), we) from we diff --git a/tests/spiffworkflow_backend/helpers/test_data.py b/tests/spiffworkflow_backend/helpers/test_data.py index 9afee257..3f8600e2 100644 --- a/tests/spiffworkflow_backend/helpers/test_data.py +++ b/tests/spiffworkflow_backend/helpers/test_data.py @@ -1,13 +1,18 @@ """User.""" +import json +from time import time from typing import Dict from typing import Optional +from flask_bpmn.models.db import db from tests.spiffworkflow_backend.helpers.example_data import ExampleDataLoader from spiffworkflow_backend.exceptions.process_entity_not_found_error import ( ProcessEntityNotFoundError, ) +from spiffworkflow_backend.helpers.fixture_data import find_or_create_user from spiffworkflow_backend.models.process_group import ProcessGroup +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.process_model_service import ProcessModelService @@ -68,6 +73,27 @@ def load_test_spec( return spec +def create_process_instance_from_process_model( + process_model: ProcessModelInfo, status: str +): + """Create_process_instance_from_process_model.""" + user = find_or_create_user() + current_time = round(time.time()) + process_instance = ProcessInstanceModel( + status=status, + process_initiator=user, + process_model_identifier=process_model.id, + process_group_identifier=process_model.process_group_id, + updated_at_in_seconds=round(time.time()), + start_in_seconds=current_time - (3600 * 1), + end_in_seconds=current_time - (3600 * 1 - 20), + bpmn_json=json.dumps({"ikey": "ivalue"}), + ) + db.session.add(process_instance) + db.session.commit() + return process_instance + + # def user_info_to_query_string(user_info, redirect_url): # query_string_list = [] # items = user_info.items() diff --git a/tests/spiffworkflow_backend/unit/test_message_service.py b/tests/spiffworkflow_backend/unit/test_message_service.py new file mode 100644 index 00000000..bbe10c18 --- /dev/null +++ b/tests/spiffworkflow_backend/unit/test_message_service.py @@ -0,0 +1,18 @@ +"""test_message_service.""" +from spiffworkflow_backend.services.message_model import MessageModel +from spiffworkflow_backend.services.message_instance import MessageInstanceModel +from tests.spiffworkflow_backend.helpers.test_data import create_process_instance_from_process_model + + +def test_can_send_message_to_waiting_message() -> None: + """Test_can_send_message_to_waiting_message.""" + message_model_name = "message_model_one" + message_model = MessageModel(name=message_model_name) + process_instance = create_process_instance_from_process_model + + queued_message_send = MessageInstanceModel( + process_instance_id=process_instance.id, + bpmn_element_id="something", + message_type="send", + message_model=message_model, + )