some more framework for messages

This commit is contained in:
jasquat 2022-08-01 16:44:51 -04:00
parent f9350f0a84
commit df26c77fd7
15 changed files with 229 additions and 54 deletions

View File

@ -32,6 +32,7 @@ if [[ "${APPLICATION_ROOT:-}" != "/" ]]; then
fi
export IS_GUNICORN="true"
export PROCESS_WAITING_MESSAGES="true"
# THIS MUST BE THE LAST COMMAND!
exec poetry run gunicorn ${additional_args} --bind "0.0.0.0:$port" --workers=3 --timeout 90 --capture-output --access-logfile '-' --log-level debug wsgi:app

View File

@ -22,5 +22,5 @@ export APPLICATION_ROOT="/"
if [[ -n "${SPIFFWORKFLOW_BACKEND_LOAD_FIXTURE_DATA:-}" ]]; then
./bin/boot_server_in_docker
else
FLASK_APP=src/spiffworkflow_backend poetry run flask run -p 7000
PROCESS_WAITING_MESSAGES="true" FLASK_APP=src/spiffworkflow_backend poetry run flask run -p 7000
fi

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: a5a93fe63899
Revision ID: c7735c07773c
Revises:
Create Date: 2022-07-27 08:51:44.791339
Create Date: 2022-08-01 14:22:21.009841
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a5a93fe63899'
revision = 'c7735c07773c'
down_revision = None
branch_labels = None
depends_on = None
@ -31,6 +31,12 @@ def upgrade():
sa.Column('new_name_two', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('message_model',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_message_model_name'), 'message_model', ['name'], unique=True)
op.create_table('user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=50), nullable=False),
@ -135,8 +141,9 @@ def upgrade():
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('bpmn_element_id', sa.String(length=50), nullable=False),
sa.Column('correlation_name', sa.String(length=50), nullable=False),
sa.Column('correlation_value', sa.String(length=50), nullable=False),
sa.Column('messsage_type', sa.Enum('send', 'receive', name='messagetypes'), nullable=False),
sa.Column('message_model', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['message_model'], ['message_model.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
@ -195,6 +202,8 @@ def downgrade():
op.drop_table('process_instance')
op.drop_table('principal')
op.drop_table('user')
op.drop_index(op.f('ix_message_model_name'), table_name='message_model')
op.drop_table('message_model')
op.drop_table('group')
op.drop_table('admin_session')
# ### end Alembic commands ###

29
poetry.lock generated
View File

@ -43,6 +43,32 @@ python-versions = "*"
[package.extras]
dev = ["black", "coverage", "isort", "pre-commit", "pyenchant", "pylint"]
[[package]]
name = "apscheduler"
version = "3.9.1"
description = "In-process task scheduler with Cron-like capabilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4"
[package.dependencies]
pytz = "*"
six = ">=1.4.0"
tzlocal = ">=2.0,<3.0.0 || >=4.0.0"
[package.extras]
asyncio = ["trollius"]
doc = ["sphinx", "sphinx-rtd-theme"]
gevent = ["gevent"]
mongodb = ["pymongo (>=3.0)"]
redis = ["redis (>=3.0)"]
rethinkdb = ["rethinkdb (>=2.4.0)"]
sqlalchemy = ["sqlalchemy (>=0.8)"]
testing = ["pytest", "pytest-cov", "pytest-tornado5", "mock", "pytest-asyncio (<0.6)", "pytest-asyncio"]
tornado = ["tornado (>=4.3)"]
twisted = ["twisted"]
zookeeper = ["kazoo"]
[[package]]
name = "astroid"
version = "2.11.6"
@ -2093,7 +2119,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "66503576ef158089a92526ed6982bc93481868a47fec14c47d1ce9a5bcc08979"
content-hash = "37bb9f834037be053e37ca34ba825e2be692554e4c668a4bae42560987d6e092"
[metadata.files]
alabaster = [
@ -2112,6 +2138,7 @@ aniso8601 = [
{file = "aniso8601-9.0.1-py2.py3-none-any.whl", hash = "sha256:1d2b7ef82963909e93c4f24ce48d4de9e66009a21bf1c1e1c85bdd0812fe412f"},
{file = "aniso8601-9.0.1.tar.gz", hash = "sha256:72e3117667eedf66951bb2d93f4296a56b94b078a8a95905a052611fb3f1b973"},
]
apscheduler = []
astroid = [
{file = "astroid-2.11.6-py3-none-any.whl", hash = "sha256:ba33a82a9a9c06a5ceed98180c5aab16e29c285b828d94696bf32d6015ea82a9"},
{file = "astroid-2.11.6.tar.gz", hash = "sha256:4f933d0bf5e408b03a6feb5d23793740c27e07340605f236496cd6ce552043d6"},

View File

@ -47,6 +47,7 @@ PyJWT = "^2.4.0"
gunicorn = "^20.1.0"
types-pytz = "^2022.1.1"
python-keycloak = "^1.9.1"
APScheduler = "^3.9.1"
[tool.poetry.dev-dependencies]

View File

@ -5,6 +5,7 @@ from typing import Any
import connexion # type: ignore
import flask.app
import flask.json
from apscheduler.schedulers.background import BackgroundScheduler
from flask_bpmn.api.api_error import api_error_blueprint
from flask_bpmn.models.db import db
from flask_bpmn.models.db import migrate
@ -16,6 +17,7 @@ from spiffworkflow_backend.config import setup_config
from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint
from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
from spiffworkflow_backend.services.message_service import MessageServiceWithAppContext
class MyJSONEncoder(flask.json.JSONEncoder):
@ -28,6 +30,12 @@ class MyJSONEncoder(flask.json.JSONEncoder):
return super().default(obj)
# def process_queued_messages(app: flask.app.Flask):
# """Process_queued_messages."""
# with app.app_context():
# MessageService().process_queued_messages()
def create_app() -> flask.app.Flask:
"""Create_app."""
# We need to create the sqlite database in a known location.
@ -71,4 +79,13 @@ def create_app() -> flask.app.Flask:
app.json_encoder = MyJSONEncoder
if app.config["PROCESS_WAITING_MESSAGES"]:
scheduler = BackgroundScheduler()
scheduler.add_job(
MessageServiceWithAppContext(app).process_queued_messages_with_app_context,
"interval",
minutes=1,
)
scheduler.start()
return app # type: ignore

View File

@ -7,3 +7,7 @@ CORS_DEFAULT = "*"
CORS_ALLOW_ORIGINS = re.split(
r",\s*", environ.get("CORS_ALLOW_ORIGINS", default=CORS_DEFAULT)
)
PROCESS_WAITING_MESSAGES = (
environ.get("PROCESS_WAITING_MESSAGES", default="false") == "true"
)

View File

@ -0,0 +1,25 @@
"""Message_correlation."""
from dataclasses import dataclass
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
@dataclass
class MessageCorrelationModel(SpiffworkflowBaseDBModel):
"""Message Correlations to relate queued messages together."""
__tablename__ = "message_correlation"
__table_args__ = (
db.UniqueConstraint(
"message_id", "name", name="message_id_name_unique"
),
)
id = db.Column(db.Integer, primary_key=True)
message_id = db.Column(ForeignKey(MessageInstanceModel.id), nullable=False, index=True)
name = db.Column(db.String(50), nullable=False, index=True)
value = db.Column(db.String(50), nullable=False, index=True)

View File

@ -1,15 +1,25 @@
"""Principal."""
"""Message_instance."""
import enum
from dataclasses import dataclass
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import Enum
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.message_model import MessageModel
class MessageTypes(enum.Enum):
"""MessageTypes."""
send = 1
receive = 2
@dataclass
class QueuedSendMessageModel(SpiffworkflowBaseDBModel):
class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Messages from a process instance that are ready to send to a receiving task."""
__tablename__ = "queued_send_message"
@ -17,5 +27,5 @@ class QueuedSendMessageModel(SpiffworkflowBaseDBModel):
id = db.Column(db.Integer, primary_key=True)
process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
bpmn_element_id = db.Column(db.String(50), nullable=False)
correlation_name = db.Column(db.String(50), nullable=False)
correlation_value = db.Column(db.String(50), nullable=False)
messsage_type = db.Column(Enum(MessageTypes), nullable=False)
message_model = db.Column(ForeignKey(MessageModel.id), nullable=False)

View File

@ -0,0 +1,13 @@
"""Message_model."""
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
class MessageModel(SpiffworkflowBaseDBModel):
"""MessageModel."""
__tablename__ = "message_model"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, index=True)

View File

@ -1,21 +0,0 @@
"""Principal."""
from dataclasses import dataclass
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@dataclass
class QueuedReceiveMessageModel(SpiffworkflowBaseDBModel):
"""Messages from a process instance that are ready to receive a message from a task."""
__tablename__ = "queued_receive_message"
id = db.Column(db.Integer, primary_key=True)
process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
bpmn_element_id = db.Column(db.String(50), nullable=False)
correlation_name = db.Column(db.String(50), nullable=False)
correlation_value = db.Column(db.String(50), nullable=False)

View File

@ -0,0 +1,32 @@
"""Message_service."""
import flask
from spiffworkflow_backend.models.user import UserModel
class MessageServiceWithAppContext:
"""Wrapper for Message Service.
This wrappers is to facilitate running the MessageService from the scheduler
since we need to specify the app context then.
"""
def __init__(self, app: flask.app.Flask):
"""__init__."""
self.app = app
def process_queued_messages_with_app_context(self):
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
MessageService().process_queued_messages()
class MessageService:
"""MessageService."""
def process_queued_messages(self):
"""Process_queued_messages."""
user = UserModel.query.first()
print(f"user: {user}")
# MessageInstanceModel.query
# if

View File

@ -35,13 +35,12 @@ from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from spiffworkflow_backend.models.active_task import ActiveTaskModel
from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageModel
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.queued_receive_message import (
QueuedReceiveMessageModel,
)
from spiffworkflow_backend.models.task_event import TaskAction
from spiffworkflow_backend.models.task_event import TaskEventModel
from spiffworkflow_backend.models.user import UserModelSchema
@ -450,34 +449,48 @@ class ProcessInstanceProcessor:
"""Process_bpmn_events."""
if self.bpmn_process_instance.bpmn_events:
for bpmn_event in self.bpmn_process_instance.bpmn_events:
# FIXME: pseudocode - not sure how to determine message types yet
if bpmn_event.event == "ReceivedEvent":
queued_message = QueuedReceiveMessageModel(
process_instance_id=self.process_instance_model.id,
bpmn_element_id=bpmn_event.task_name,
message_type = None
# TODO: message: who knows the of the message model?
# will it be in the bpmn_event?
message_model = MessageModel.query.filter_by(
name=bpmn_event.message_name
)
if message_model is None:
raise ApiError(
"invalid_message_name",
f"Invalid message name: {bpmn_event.message_name}.",
)
db.session.add(queued_message)
db.session.commit()
# TODO: message - not sure how to determine message types yet
if bpmn_event.event == "WaitEvent": # and waiting for message:
message_type = "receive"
elif bpmn_event.event == "SendEvent":
queued_receive_message = (
QueuedReceiveMessageModel.query.filter_by().first()
message_type = "send"
if message_type is None:
raise ApiError(
"invalid_event_type",
f"Invalid event type for a message: {bpmn_event.event}.",
)
if queued_receive_message:
process_instance = ProcessInstanceModel.query.filter_by(
process_instance_id=queued_receive_message.process_instance_id
).first()
if process_instance is None:
raise ApiError(
"process_instance_not_found",
f"The process instance - {queued_receive_message.process_instance_id} - for receiving event - {queued_receive_message.id} - cannot be found",
)
queued_message = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
bpmn_element_id=bpmn_event.task_name,
message_type=message_type,
message_model_id=message_model.id,
)
db.session.add(queued_message)
db.session.commit()
def do_engine_steps(self, exit_at: None = None) -> None:
"""Do_engine_steps."""
try:
self.bpmn_process_instance.refresh_waiting_tasks()
self.bpmn_process_instance.do_engine_steps(exit_at=exit_at)
self.process_bpmn_events()
# TODO: run this
# self.process_bpmn_events()
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we

View File

@ -1,13 +1,18 @@
"""User."""
import json
from time import time
from typing import Dict
from typing import Optional
from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.example_data import ExampleDataLoader
from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
ProcessEntityNotFoundError,
)
from spiffworkflow_backend.helpers.fixture_data import find_or_create_user
from spiffworkflow_backend.models.process_group import ProcessGroup
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -68,6 +73,27 @@ def load_test_spec(
return spec
def create_process_instance_from_process_model(
process_model: ProcessModelInfo, status: str
):
"""Create_process_instance_from_process_model."""
user = find_or_create_user()
current_time = round(time.time())
process_instance = ProcessInstanceModel(
status=status,
process_initiator=user,
process_model_identifier=process_model.id,
process_group_identifier=process_model.process_group_id,
updated_at_in_seconds=round(time.time()),
start_in_seconds=current_time - (3600 * 1),
end_in_seconds=current_time - (3600 * 1 - 20),
bpmn_json=json.dumps({"ikey": "ivalue"}),
)
db.session.add(process_instance)
db.session.commit()
return process_instance
# def user_info_to_query_string(user_info, redirect_url):
# query_string_list = []
# items = user_info.items()

View File

@ -0,0 +1,18 @@
"""test_message_service."""
from spiffworkflow_backend.services.message_model import MessageModel
from spiffworkflow_backend.services.message_instance import MessageInstanceModel
from tests.spiffworkflow_backend.helpers.test_data import create_process_instance_from_process_model
def test_can_send_message_to_waiting_message() -> None:
"""Test_can_send_message_to_waiting_message."""
message_model_name = "message_model_one"
message_model = MessageModel(name=message_model_name)
process_instance = create_process_instance_from_process_model
queued_message_send = MessageInstanceModel(
process_instance_id=process_instance.id,
bpmn_element_id="something",
message_type="send",
message_model=message_model,
)