added api to get a list of message instances

This commit is contained in:
jasquat 2022-09-20 13:26:35 -04:00
parent 8c0b2a346c
commit 2c170efde7
6 changed files with 91 additions and 20 deletions

View File

@ -917,6 +917,44 @@ paths:
schema: schema:
$ref: "#/components/schemas/OkTrue" $ref: "#/components/schemas/OkTrue"
/messages:
parameters:
- name: process_instance_id
in: query
required: false
description: the id of the process instance
schema:
type: integer
- name: page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
- name: per_page
in: query
required: false
description: The number of models to show per page. Defaults to page 10.
schema:
type: integer
get:
tags:
- Messages
operationId: spiffworkflow_backend.routes.process_api_blueprint.message_instance_list
summary: Get a list of message instances
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
responses:
"200":
description: One task
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/messages/{message_identifier}: /messages/{message_identifier}:
parameters: parameters:
- name: message_identifier - name: message_identifier

View File

@ -25,7 +25,7 @@ class MessageCorrelationMessageInstanceModel(SpiffworkflowBaseDBModel):
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
message_instance_id = db.Column( message_instance_id = db.Column(
ForeignKey(MessageInstanceModel.id), nullable=False, index=True ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore
) )
message_correlation_id = db.Column( message_correlation_id = db.Column(
ForeignKey(MessageCorrelationModel.id), nullable=False, index=True ForeignKey(MessageCorrelationModel.id), nullable=False, index=True

View File

@ -38,15 +38,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
__tablename__ = "message_instance" __tablename__ = "message_instance"
id = db.Column(db.Integer, primary_key=True) id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
message_model_id: int = db.Column(ForeignKey(MessageModel.id), nullable=False) message_model_id: int = db.Column(ForeignKey(MessageModel.id), nullable=False)
message_model = relationship("MessageModel") message_model = relationship("MessageModel")
message_type = db.Column(db.String(20), nullable=False) message_type: str = db.Column(db.String(20), nullable=False)
payload = db.Column(db.JSON) payload: str = db.Column(db.JSON)
status = db.Column(db.String(20), nullable=False, default="ready") status: str = db.Column(db.String(20), nullable=False, default="ready")
failure_cause = db.Column(db.Text()) failure_cause: str = db.Column(db.Text())
updated_at_in_seconds: int = db.Column(db.Integer) updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer)

View File

@ -406,6 +406,37 @@ def process_instance_log_list(
return make_response(jsonify(response_json), 200) return make_response(jsonify(response_json), 200)
def message_instance_list(
process_instance_id: Optional[int] = None,
page: int = 1,
per_page: int = 100,
) -> flask.wrappers.Response:
"""Message_instance_list."""
# to make sure the process instance exists
message_instances_query = MessageInstanceModel.query
if process_instance_id:
message_instances_query = message_instances_query.filter_by(
process_instance_id=process_instance_id
)
message_instances = message_instances_query.order_by(
MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore
MessageInstanceModel.id.desc(), # type: ignore
).paginate(page, per_page, False)
response_json = {
"results": message_instances.items,
"pagination": {
"count": len(message_instances.items),
"total": message_instances.total,
"pages": message_instances.pages,
},
}
return make_response(jsonify(response_json), 200)
# body: { # body: {
# payload: dict, # payload: dict,
# process_instance_id: Optional[int], # process_instance_id: Optional[int],

View File

@ -15,21 +15,23 @@ def connector_proxy_url() -> Any:
class ServiceTaskDelegate: class ServiceTaskDelegate:
"""ServiceTaskDelegate.""" """ServiceTaskDelegate."""
@staticmethod
def normalize_value(value: Any) -> Any:
"""Normalize_value."""
secret_prefix = "secret:" # noqa: S105
if value.startswith(secret_prefix):
key = value.removeprefix(secret_prefix)
# TODO replace with call to secret store
value = key
return value
@staticmethod @staticmethod
def call_connector(name: str, bpmn_params: Any) -> str: def call_connector(name: str, bpmn_params: Any) -> str:
"""Calls a connector via the configured proxy.""" """Calls a connector via the configured proxy."""
params = {
def normalize_value(v: Any) -> Any: k: ServiceTaskDelegate.normalize_value(v["value"])
"""Normalize_value.""" for k, v in bpmn_params.items()
value = v["value"] }
secret_prefix = "secret:" # noqa: S105
if value.startswith(secret_prefix):
key = value.removeprefix(secret_prefix)
# TODO replace with call to secret store
value = key
return value
params = {k: normalize_value(v) for k, v in bpmn_params.items()}
proxied_response = requests.get(f"{connector_proxy_url()}/v1/do/{name}", params) proxied_response = requests.get(f"{connector_proxy_url()}/v1/do/{name}", params)
if proxied_response.status_code != 200: if proxied_response.status_code != 200:

View File

@ -34,7 +34,7 @@ class TestMessageInstance(BaseTest):
assert queued_message.status == "ready" assert queued_message.status == "ready"
assert queued_message.failure_cause is None assert queued_message.failure_cause is None
queued_message_from_query = MessageInstanceModel.query.filter_by( queued_message_from_query = MessageInstanceModel.query.filter_by( # type: ignore
id=queued_message.id id=queued_message.id
).first() ).first()
assert queued_message_from_query is not None assert queued_message_from_query is not None
@ -137,7 +137,7 @@ class TestMessageInstance(BaseTest):
== "MessageInstanceModel: failure_cause must be set if status is failed" == "MessageInstanceModel: failure_cause must be set if status is failed"
) )
assert queued_message.id is None assert queued_message.id is None
db.session.remove() db.session.remove() # type: ignore
queued_message = MessageInstanceModel( queued_message = MessageInstanceModel(
process_instance_id=process_instance.id, process_instance_id=process_instance.id,