Merge remote-tracking branch 'origin/main' into feature/background-process-waiting-tasks

This commit is contained in:
burnettk 2022-09-20 14:59:05 -04:00
commit a03fef847f
11 changed files with 160 additions and 35 deletions

View File

@ -20,7 +20,6 @@ from spiffworkflow_backend.routes.user_blueprint import user_blueprint
from spiffworkflow_backend.services.background_processing_service import (
BackgroundProcessingService,
)
from spiffworkflow_backend.services.message_service import MessageServiceWithAppContext
class MyJSONEncoder(flask.json.JSONEncoder):
@ -37,14 +36,14 @@ def start_scheduler(app: flask.app.Flask) -> None:
"""Start_scheduler."""
scheduler = BackgroundScheduler()
scheduler.add_job(
MessageServiceWithAppContext(app).process_message_instances_with_app_context,
BackgroundProcessingService(app).process_message_instances_with_app_context,
"interval",
seconds=10,
)
scheduler.add_job(
BackgroundProcessingService(app).run,
"interval",
seconds=5,
seconds=30,
)
scheduler.start()

View File

@ -917,6 +917,44 @@ paths:
schema:
$ref: "#/components/schemas/OkTrue"
/messages:
parameters:
- name: process_instance_id
in: query
required: false
description: the id of the process instance
schema:
type: integer
- name: page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
- name: per_page
in: query
required: false
description: The number of models to show per page. Defaults to page 10.
schema:
type: integer
get:
tags:
- Messages
operationId: spiffworkflow_backend.routes.process_api_blueprint.message_instance_list
summary: Get a list of message instances
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
responses:
"200":
description: One task
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/messages/{message_identifier}:
parameters:
- name: message_identifier

View File

@ -25,7 +25,7 @@ class MessageCorrelationMessageInstanceModel(SpiffworkflowBaseDBModel):
id = db.Column(db.Integer, primary_key=True)
message_instance_id = db.Column(
ForeignKey(MessageInstanceModel.id), nullable=False, index=True
ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore
)
message_correlation_id = db.Column(
ForeignKey(MessageCorrelationModel.id), nullable=False, index=True

View File

@ -38,15 +38,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
__tablename__ = "message_instance"
id = db.Column(db.Integer, primary_key=True)
id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
message_model_id: int = db.Column(ForeignKey(MessageModel.id), nullable=False)
message_model = relationship("MessageModel")
message_type = db.Column(db.String(20), nullable=False)
payload = db.Column(db.JSON)
status = db.Column(db.String(20), nullable=False, default="ready")
failure_cause = db.Column(db.Text())
message_type: str = db.Column(db.String(20), nullable=False)
payload: str = db.Column(db.JSON)
status: str = db.Column(db.String(20), nullable=False, default="ready")
failure_cause: str = db.Column(db.Text())
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)

View File

@ -406,6 +406,37 @@ def process_instance_log_list(
return make_response(jsonify(response_json), 200)
def message_instance_list(
process_instance_id: Optional[int] = None,
page: int = 1,
per_page: int = 100,
) -> flask.wrappers.Response:
"""Message_instance_list."""
# to make sure the process instance exists
message_instances_query = MessageInstanceModel.query
if process_instance_id:
message_instances_query = message_instances_query.filter_by(
process_instance_id=process_instance_id
)
message_instances = message_instances_query.order_by(
MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore
MessageInstanceModel.id.desc(), # type: ignore
).paginate(page, per_page, False)
response_json = {
"results": message_instances.items,
"pagination": {
"count": len(message_instances.items),
"total": message_instances.total,
"pages": message_instances.pages,
},
}
return make_response(jsonify(response_json), 200)
# body: {
# payload: dict,
# process_instance_id: Optional[int],

View File

@ -1,6 +1,7 @@
"""Background_processing_service."""
import flask
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
@ -17,3 +18,8 @@ class BackgroundProcessingService:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
ProcessInstanceService.do_waiting()
def process_message_instances_with_app_context(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
MessageService.process_message_instances()

View File

@ -2,7 +2,6 @@
from typing import Any
from typing import Optional
import flask
from flask_bpmn.models.db import db
from sqlalchemy import and_
from sqlalchemy import or_
@ -26,23 +25,6 @@ from spiffworkflow_backend.services.process_instance_service import (
)
class MessageServiceWithAppContext:
"""Wrapper for Message Service.
This wrappers is to facilitate running the MessageService from the scheduler
since we need to specify the app context then.
"""
def __init__(self, app: flask.app.Flask):
"""__init__."""
self.app = app
def process_message_instances_with_app_context(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
MessageService.process_message_instances()
class MessageServiceError(Exception):
"""MessageServiceError."""

View File

@ -61,7 +61,6 @@ class ProcessInstanceService:
@staticmethod
def do_waiting() -> None:
"""Do_waiting."""
current_app.logger.info("ProcessInstanceService: do_waiting()")
records = (
db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.status == ProcessInstanceStatus.waiting.value)

View File

@ -16,9 +16,8 @@ class ServiceTaskDelegate:
"""ServiceTaskDelegate."""
@staticmethod
def normalize_value(v: Any) -> Any:
def normalize_value(value: Any) -> Any:
"""Normalize_value."""
value = v["value"]
secret_prefix = "secret:" # noqa: S105
if value.startswith(secret_prefix):
key = value.removeprefix(secret_prefix)
@ -26,10 +25,13 @@ class ServiceTaskDelegate:
value = key
return value
@classmethod
def call_connector(cls, name: str, bpmn_params: Any) -> str:
@staticmethod
def call_connector(name: str, bpmn_params: Any) -> str:
"""Calls a connector via the configured proxy."""
params = {k: cls.normalize_value(v) for k, v in bpmn_params.items()}
params = {
k: ServiceTaskDelegate.normalize_value(v["value"])
for k, v in bpmn_params.items()
}
proxied_response = requests.get(f"{connector_proxy_url()}/v1/do/{name}", params)
if proxied_response.status_code != 200:

View File

@ -1400,6 +1400,74 @@ class TestProcessApi(BaseTest):
assert result["name"] == file_name
assert bytes(str(result["file_contents"]), "utf-8") == file_data
def test_can_get_message_instances_by_process_instance_id_and_without(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_can_get_message_instances_by_process_instance_id."""
load_test_spec(
"message_receiver",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_receiver",
)
user = self.find_or_create_user()
message_model_identifier = "message_send"
payload = {
"topica": "the_topica_string",
"topicb": "the_topicb_string",
"andThis": "another_item_non_key",
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
headers=logged_in_headers(user),
data=json.dumps({"payload": payload}),
)
assert response.status_code == 200
assert response.json is not None
process_instance_id_one = response.json["id"]
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
headers=logged_in_headers(user),
data=json.dumps({"payload": payload}),
)
assert response.status_code == 200
assert response.json is not None
process_instance_id_two = response.json["id"]
response = client.get(
f"/v1.0/messages?process_instance_id={process_instance_id_one}",
headers=logged_in_headers(user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert (
response.json["results"][0]["process_instance_id"]
== process_instance_id_one
)
response = client.get(
f"/v1.0/messages?process_instance_id={process_instance_id_two}",
headers=logged_in_headers(user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert (
response.json["results"][0]["process_instance_id"]
== process_instance_id_two
)
response = client.get(
"/v1.0/messages",
headers=logged_in_headers(user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 2
# def test_get_process_model(self):
#
# load_test_spec('random_fact')

View File

@ -34,7 +34,7 @@ class TestMessageInstance(BaseTest):
assert queued_message.status == "ready"
assert queued_message.failure_cause is None
queued_message_from_query = MessageInstanceModel.query.filter_by(
queued_message_from_query = MessageInstanceModel.query.filter_by( # type: ignore
id=queued_message.id
).first()
assert queued_message_from_query is not None
@ -137,7 +137,7 @@ class TestMessageInstance(BaseTest):
== "MessageInstanceModel: failure_cause must be set if status is failed"
)
assert queued_message.id is None
db.session.remove()
db.session.remove() # type: ignore
queued_message = MessageInstanceModel(
process_instance_id=process_instance.id,