wip: trying to get a test going for messages

This commit is contained in:
jasquat 2022-08-15 09:19:53 -04:00
parent ccd67489cb
commit 435b1ccde8
6 changed files with 103 additions and 95 deletions

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 6ea93d3b0d76
Revision ID: e093f2840fcd
Revises:
Create Date: 2022-08-10 13:12:22.289229
Create Date: 2022-08-11 15:42:44.848283
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ea93d3b0d76'
revision = 'e093f2840fcd'
down_revision = None
branch_labels = None
depends_on = None
@ -149,7 +149,7 @@ def upgrade():
sa.ForeignKeyConstraint(['user_uid'], ['user.uid'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('queued_send_message',
op.create_table('message_instance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('message_model_id', sa.Integer(), nullable=False),
@ -201,7 +201,7 @@ def upgrade():
sa.Column('message_instance_id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('value', sa.String(length=50), nullable=False),
sa.ForeignKeyConstraint(['message_instance_id'], ['queued_send_message.id'], ),
sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique')
)
@ -219,7 +219,7 @@ def downgrade():
op.drop_table('message_correlation')
op.drop_table('data_store')
op.drop_table('task_event')
op.drop_table('queued_send_message')
op.drop_table('message_instance')
op.drop_table('file')
op.drop_table('active_task')
op.drop_table('user_group_assignment')

View File

@ -35,7 +35,7 @@ class MessageStatuses(enum.Enum):
class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Messages from a process instance that are ready to send to a receiving task."""
__tablename__ = "queued_send_message"
__tablename__ = "message_instance"
id = db.Column(db.Integer, primary_key=True)
process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore

View File

@ -51,7 +51,7 @@ class MessageService:
for queued_message_send in queued_messages_send:
# check again in case another background process picked up the message
# while the previous one was running
if queued_message_send.status != "receive":
if queued_message_send.status != "ready":
continue
queued_message_send.status = "running"
@ -84,6 +84,7 @@ class MessageService:
db.session.add(queued_message_receive)
db.session.commit()
raise exception
def process_message_receive(
self,
@ -101,17 +102,18 @@ class MessageService:
)
processor_send = ProcessInstanceProcessor(process_instance_send)
spiff_task_send = processor_send.bpmn_process_instance.get_task_by_id(
spiff_task_send = processor_send.get_task_by_id(
queued_message_send.bpmn_element_id
)
print(f"queued_message_send.bpmn_element_id: {queued_message_send.bpmn_element_id}")
if spiff_task_send is None:
raise MessageServiceError(
"Processor failed to obtain task.",
)
message_event_send = MessageEventDefinition(
spiff_task_send.id, payload=spiff_task_send.payload
)
# message_event_send = MessageEventDefinition(
# spiff_task_send.id, payload=spiff_task_send.payload
# )
process_instance_receive = ProcessInstanceModel.query.filter_by(
id=queued_message_receive.process_instance_id
@ -125,7 +127,8 @@ class MessageService:
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.catch(message_event_send)
import pdb; pdb.set_trace()
processor_receive.bpmn_process_instance.catch_bpmn_message(spiff_task_send.id, spiff_task_send.payload)
def get_queued_message_receive(
self,

View File

@ -78,22 +78,22 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
"""Evaluate."""
return self._evaluate(expression, task.data, task)
def _evaluate(
self,
expression: str,
context: Dict[str, Union[Box, str]],
task: Optional[SpiffTask] = None,
_external_methods: None = None,
) -> Any:
"""Evaluate the given expression, within the context of the given task and return the result."""
try:
return super()._evaluate(expression, context, task, {})
except Exception as exception:
raise WorkflowTaskExecException(
task,
"Error evaluating expression "
"'%s', %s" % (expression, str(exception)),
) from exception
# def _evaluate(
# self,
# expression: str,
# context: Dict[str, Union[Box, str]],
# task: Optional[SpiffTask] = None,
# _external_methods: None = None,
# ) -> Any:
# """Evaluate the given expression, within the context of the given task and return the result."""
# try:
# return super()._evaluate(expression, context, task, {})
# except Exception as exception:
# raise WorkflowTaskExecException(
# task,
# "Error evaluating expression "
# "'%s', %s" % (expression, str(exception)),
# ) from exception
def execute(
self, task: SpiffTask, script: str, data: Dict[str, Dict[str, str]]
@ -487,73 +487,73 @@ class ProcessInstanceProcessor:
"""Get_status."""
return self.status_of(self.bpmn_process_instance)
def process_bpmn_events(self) -> None:
"""Process_bpmn_events."""
if self.bpmn_process_instance.bpmn_events:
for bpmn_event in self.bpmn_process_instance.bpmn_events:
message_type = None
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
for bpmn_message in self.bpmn_process_instance.get_bpmn_messages():
print("WE PROCESS")
message_type = None
# TODO: message: who knows the name of the message model?
# will it be in the bpmn_event?
message_model = MessageModel.query.filter_by(
name=bpmn_event.message_name
).first()
# TODO: message: who knows the name of the message model?
# will it be in the bpmn_message?
message_model = MessageModel.query.filter_by(
name=bpmn_message.message_name
).first()
if message_model is None:
raise ApiError(
"invalid_message_name",
f"Invalid message name: {bpmn_event.message_name}.",
)
# TODO: message - not sure how to determine message types yet
if bpmn_event.event == "WaitEvent": # and waiting for message:
message_type = "receive"
elif bpmn_event.event == "SendEvent":
message_type = "send"
if message_type is None:
raise ApiError(
"invalid_event_type",
f"Invalid event type for a message: {bpmn_event.event}.",
)
if not bpmn_event.message_correlations:
raise ApiError(
"message_correlations_missing",
f"Could not find any message correlations bpmn_event: {bpmn_event}",
)
for message_correlation in bpmn_event.message_correlations:
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
message_model_id=message_model.id,
identifier=message_correlation.identifier,
).first()
)
if message_correlation_property is None:
raise ApiError(
"message_correlations_missing_from_process",
f"Could not find a known message correlation with identifier: {message_correlation.identifier}",
)
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
bpmn_element_id=bpmn_event.task_name,
message_type=message_type,
message_model_id=message_model.id,
if message_model is None:
raise ApiError(
"invalid_message_name",
f"Invalid message name: {bpmn_message.message_name}.",
)
db.session.add(message_instance)
db.session.commit()
# TODO: find out what spiff will call the correlations
for message_correlation in bpmn_event.message_correlations:
message_correlation = MessageCorrelationModel(
message_instance_id=message_instance.id,
name=message_correlation.name,
value=message_correlation.value,
# TODO: message - not sure how to determine message types yet
if bpmn_message.event == "WaitEvent": # and waiting for message:
message_type = "receive"
elif bpmn_message.event == "SendEvent":
message_type = "send"
if message_type is None:
raise ApiError(
"invalid_event_type",
f"Invalid event type for a message: {bpmn_message.event}.",
)
if not bpmn_message.message_correlations:
raise ApiError(
"message_correlations_missing",
f"Could not find any message correlations bpmn_message: {bpmn_message}",
)
for message_correlation in bpmn_message.message_correlations:
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
message_model_id=message_model.id,
identifier=message_correlation.identifier,
).first()
)
if message_correlation_property is None:
raise ApiError(
"message_correlations_missing_from_process",
f"Could not find a known message correlation with identifier: {message_correlation.identifier}",
)
db.session.add(message_correlation)
db.session.commit()
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
bpmn_element_id=bpmn_message.task_name,
message_type=message_type,
message_model_id=message_model.id,
)
db.session.add(message_instance)
db.session.commit()
# TODO: find out what spiff will call the correlations
for message_correlation in bpmn_message.message_correlations:
message_correlation = MessageCorrelationModel(
message_instance_id=message_instance.id,
name=message_correlation.name,
value=message_correlation.value,
)
db.session.add(message_correlation)
db.session.commit()
def do_engine_steps(self, exit_at: None = None) -> None:
"""Do_engine_steps."""
@ -561,7 +561,7 @@ class ProcessInstanceProcessor:
self.bpmn_process_instance.refresh_waiting_tasks()
self.bpmn_process_instance.do_engine_steps(exit_at=exit_at)
# TODO: run this
# self.process_bpmn_events()
self.process_bpmn_messages()
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
@ -725,7 +725,10 @@ class ProcessInstanceProcessor:
def get_task_by_id(self, task_id: str) -> SpiffTask:
"""Get_task_by_id."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [t for t in all_tasks if t.id == task_id]
for task in all_tasks:
if task.id == task_id:
return task
return None
def get_nav_item(self, task: SpiffTask) -> Any:
"""Get_nav_item."""

View File

@ -70,7 +70,6 @@ class BaseTest:
updated_at_in_seconds=round(time.time()),
start_in_seconds=current_time - (3600 * 1),
end_in_seconds=current_time - (3600 * 1 - 20),
bpmn_json=json.dumps({"ikey": "ivalue"}),
)
db.session.add(process_instance)
db.session.commit()

View File

@ -1,4 +1,5 @@
"""Test_message_service."""
from spiffworkflow_backend.services.message_service import MessageService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -77,4 +78,6 @@ class TestMessageService(BaseTest):
db.session.add(message_correlation_two_receive)
db.session.commit()
# MessageService().process_queued_messages()
MessageService().process_queued_messages()
print(queued_message_send.failure_cause)
print(queued_message_send.status)