Feature/event payloads part 2 (#401)
* Revert "Revert "Feature/event payloads (#393)"" This reverts commit95fafb7af1
. * Revert "Revert "poet not available in container"" This reverts commit140220498c
. * Revert "Revert "Run event payloads data migration from background processor (#399)"" This reverts commit2afced3a51
. * Revert "Revert "using new spiff api to get info about events. w/ elizabeth"" This reverts commitaf857fee22
. * Revert "Revert "fix tests for waiting_event_can_be_skipped"" This reverts commit886e6bd42a
. * push image for preview env * default scripts to localhost w/ burnettk * use the bugfix/update-split-task-inputs spiffworkflow branch w/ burnettk * removed debug json files * use main for spiffworkflow * do not attempt to highlight non-diagram boundary items w/ burnettk * updated SpiffWorkflow to fix multiple signal event issue w/ burnettk --------- Co-authored-by: burnettk <burnettk@users.noreply.github.com> Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
8cf8387cd6
commit
9313a9f73a
|
@ -31,6 +31,7 @@ on:
|
|||
push:
|
||||
branches:
|
||||
- main
|
||||
- feature/event-payloads-part-2
|
||||
- spiffdemo
|
||||
|
||||
jobs:
|
||||
|
|
|
@ -38,7 +38,7 @@ function run_autofixers() {
|
|||
asdf reshim python
|
||||
fi
|
||||
|
||||
python_dirs=$(get_python_dirs)
|
||||
python_dirs="$(get_python_dirs) bin"
|
||||
ruff --fix $python_dirs || echo ''
|
||||
}
|
||||
|
||||
|
|
|
@ -88,6 +88,10 @@ if [[ -z "${SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER:-}" ]]; then
|
|||
export SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER
|
||||
fi
|
||||
|
||||
# DELETE after this runs on all necessary environments
|
||||
# TODO: make a system somewhat like schema migrations (storing versions in a db table) to handle data migrations
|
||||
poetry run python ./bin/data_migrations/version_1_3.py
|
||||
|
||||
# --worker-class is not strictly necessary, since setting threads will automatically set the worker class to gthread, but meh
|
||||
export IS_GUNICORN="true"
|
||||
# THIS MUST BE THE LAST COMMAND!
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
import time
|
||||
|
||||
from spiffworkflow_backend import create_app
|
||||
from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree
|
||||
|
||||
|
||||
def main() -> None:
|
||||
app = create_app()
|
||||
start_time = time.time()
|
||||
|
||||
with app.app_context():
|
||||
VersionOneThree().run()
|
||||
|
||||
end_time = time.time()
|
||||
print(
|
||||
f"done running data migration from ./bin/data_migrations/version_1_3.py. took {end_time - start_time} seconds"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -21,11 +21,9 @@ set -o errtrace -o errexit -o nounset -o pipefail
|
|||
# ./bin/get_token ciadmin1 ciadmin1 '%2Fprocess-models'
|
||||
|
||||
if [[ -z "${BACKEND_BASE_URL:-}" ]]; then
|
||||
# BACKEND_BASE_URL=http://localhost:7000
|
||||
BACKEND_BASE_URL=https://api.dev.spiffworkflow.org
|
||||
BACKEND_BASE_URL=http://localhost:7000
|
||||
fi
|
||||
if [[ -z "${KEYCLOAK_BASE_URL:-}" ]]; then
|
||||
# KEYCLOAK_BASE_URL=http://localhost:7002
|
||||
if grep -qE "spiffworkflow.org" <<<"$BACKEND_BASE_URL" ; then
|
||||
env_domain=$(hot_sed -E 's/.*api\.(\w+\.spiffworkflow.org).*/\1/' <<<"${BACKEND_BASE_URL}")
|
||||
KEYCLOAK_BASE_URL="https://keycloak.${env_domain}"
|
||||
|
|
|
@ -10,12 +10,10 @@ set -o errtrace -o errexit -o nounset -o pipefail
|
|||
script_dir="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
|
||||
|
||||
if [[ -z "${KEYCLOAK_BASE_URL:-}" ]]; then
|
||||
# export KEYCLOAK_BASE_URL=http://localhost:7002
|
||||
export KEYCLOAK_BASE_URL=https://keycloak.dev.spiffworkflow.org
|
||||
export KEYCLOAK_BASE_URL=http://localhost:7002
|
||||
fi
|
||||
if [[ -z "${BACKEND_BASE_URL:-}" ]]; then
|
||||
# export BACKEND_BASE_URL=http://localhost:7000
|
||||
export BACKEND_BASE_URL=https://api.dev.spiffworkflow.org
|
||||
export BACKEND_BASE_URL=http://localhost:7000
|
||||
fi
|
||||
|
||||
user_list="${1}"
|
||||
|
|
|
@ -4,16 +4,29 @@ import time
|
|||
from apscheduler.schedulers.background import BlockingScheduler # type: ignore
|
||||
from spiffworkflow_backend import create_app
|
||||
from spiffworkflow_backend import start_scheduler
|
||||
from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree
|
||||
from spiffworkflow_backend.helpers.db_helper import try_to_connect
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main."""
|
||||
seconds_to_wait = 300
|
||||
print(f"sleeping for {seconds_to_wait} seconds to give the api container time to run the migration")
|
||||
time.sleep(seconds_to_wait)
|
||||
print("done sleeping")
|
||||
|
||||
print("running data migration from background processor")
|
||||
app = create_app()
|
||||
start_time = time.time()
|
||||
|
||||
with app.app_context():
|
||||
try_to_connect(start_time)
|
||||
VersionOneThree().run()
|
||||
|
||||
end_time = time.time()
|
||||
print(
|
||||
f"done running data migration from background processor. took {end_time - start_time} seconds. starting"
|
||||
" scheduler"
|
||||
)
|
||||
start_scheduler(app, BlockingScheduler)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: 214e0c5fb418
|
||||
Revises: 64adf34a98db
|
||||
Create Date: 2023-07-11 14:52:15.825136
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '214e0c5fb418'
|
||||
down_revision = '64adf34a98db'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('correlation_key_names', sa.JSON(), nullable=True))
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('message_instance_correlation_rule', schema=None) as batch_op:
|
||||
batch_op.drop_column('correlation_key_names')
|
||||
|
||||
# ### end Alembic commands ###
|
|
@ -2285,7 +2285,7 @@ lxml = "*"
|
|||
type = "git"
|
||||
url = "https://github.com/sartography/SpiffWorkflow"
|
||||
reference = "main"
|
||||
resolved_reference = "9bd018e596d42b897da67ff894f458d6bc40bdf9"
|
||||
resolved_reference = "0adfc8cbaec80d36f98a4136434e960f666fcfe2"
|
||||
|
||||
[[package]]
|
||||
name = "sqlalchemy"
|
||||
|
|
|
@ -0,0 +1,195 @@
|
|||
import copy
|
||||
import json
|
||||
import uuid
|
||||
from hashlib import sha256
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.task import Task
|
||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
|
||||
class VersionOneThree:
|
||||
"""Migrates data in the database to be compatible with SpiffWorkflow at git revision ebcdde95.
|
||||
|
||||
Converts migration file from SpiffWorkflow to work with backend's db:
|
||||
https://github.com/sartography/SpiffWorkflow/blob/main/SpiffWorkflow/bpmn/serializer/migration/version_1_3.py
|
||||
"""
|
||||
|
||||
def run(self) -> None:
|
||||
print("start VersionOneThree.run")
|
||||
task_definitions = self.get_relevant_task_definitions()
|
||||
print(f"found relevant task_definitions: {len(task_definitions)}")
|
||||
for task_definition in task_definitions:
|
||||
self.process_task_definition(task_definition)
|
||||
relating_task_models = TaskModel.query.filter_by(task_definition_id=task_definition.id).all()
|
||||
for task_model in relating_task_models:
|
||||
self.process_task_model(task_model, task_definition)
|
||||
|
||||
task_definitions_with_events = TaskDefinitionModel.query.filter(
|
||||
or_(
|
||||
TaskDefinitionModel.typename.like("%Event%"), # type: ignore
|
||||
TaskDefinitionModel.typename.in_(["SendTask", "ReceiveTask"]), # type: ignore
|
||||
)
|
||||
).all()
|
||||
for tdwe in task_definitions_with_events:
|
||||
self.update_event_definitions(tdwe)
|
||||
|
||||
db.session.commit()
|
||||
print("end VersionOneThree.run")
|
||||
|
||||
def get_relevant_task_definitions(self) -> list[TaskDefinitionModel]:
|
||||
task_definitions: list[TaskDefinitionModel] = TaskDefinitionModel.query.filter_by(
|
||||
typename="_BoundaryEventParent"
|
||||
).all()
|
||||
return task_definitions
|
||||
|
||||
def process_task_definition(self, task_definition: TaskDefinitionModel) -> None:
|
||||
task_definition.typename = "BoundaryEventSplit"
|
||||
task_definition.bpmn_identifier = task_definition.bpmn_identifier.replace(
|
||||
"BoundaryEventParent", "BoundaryEventSplit"
|
||||
)
|
||||
|
||||
properties_json = copy.copy(task_definition.properties_json)
|
||||
properties_json.pop("main_child_task_spec")
|
||||
properties_json["typename"] = task_definition.typename
|
||||
properties_json["name"] = task_definition.bpmn_identifier
|
||||
task_definition.properties_json = properties_json
|
||||
flag_modified(task_definition, "properties_json") # type: ignore
|
||||
db.session.add(task_definition)
|
||||
|
||||
join_properties_json = {
|
||||
"name": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
|
||||
"manual": False,
|
||||
"bpmn_id": None,
|
||||
"lookahead": 2,
|
||||
"inputs": properties_json["outputs"],
|
||||
"outputs": [],
|
||||
"split_task": task_definition.bpmn_identifier,
|
||||
"threshold": None,
|
||||
"cancel": True,
|
||||
"typename": "BoundaryEventJoin",
|
||||
}
|
||||
|
||||
join_task_definition = TaskDefinitionModel(
|
||||
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
|
||||
bpmn_identifier=join_properties_json["name"],
|
||||
typename=join_properties_json["typename"],
|
||||
properties_json=join_properties_json,
|
||||
)
|
||||
db.session.add(join_task_definition)
|
||||
|
||||
for parent_bpmn_identifier in properties_json["inputs"]:
|
||||
parent_task_definition = TaskDefinitionModel.query.filter_by(
|
||||
bpmn_identifier=parent_bpmn_identifier,
|
||||
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
|
||||
).first()
|
||||
parent_task_definition.properties_json["outputs"] = [
|
||||
name.replace("BoundaryEventParent", "BoundaryEventSplit")
|
||||
for name in parent_task_definition.properties_json["outputs"]
|
||||
]
|
||||
flag_modified(parent_task_definition, "properties_json") # type: ignore
|
||||
db.session.add(parent_task_definition)
|
||||
|
||||
for child_bpmn_identifier in properties_json["outputs"]:
|
||||
child_task_definition = TaskDefinitionModel.query.filter_by(
|
||||
bpmn_identifier=child_bpmn_identifier,
|
||||
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
|
||||
).first()
|
||||
child_task_definition.properties_json["outputs"].append(join_task_definition.bpmn_identifier)
|
||||
child_task_definition.properties_json["inputs"] = [
|
||||
name.replace("BoundaryEventParent", "BoundaryEventSplit")
|
||||
for name in child_task_definition.properties_json["inputs"]
|
||||
]
|
||||
flag_modified(child_task_definition, "properties_json") # type: ignore
|
||||
db.session.add(child_task_definition)
|
||||
|
||||
def process_task_model(self, task_model: TaskModel, task_definition: TaskDefinitionModel) -> None:
|
||||
task_model.properties_json["task_spec"] = task_definition.bpmn_identifier
|
||||
flag_modified(task_model, "properties_json") # type: ignore
|
||||
db.session.add(task_model)
|
||||
|
||||
child_task_models = []
|
||||
all_children_completed = True
|
||||
|
||||
# Ruff keeps complaining unless it's done like this
|
||||
blank_json = json.dumps({})
|
||||
blank_json_data_hash = sha256(blank_json.encode("utf8")).hexdigest()
|
||||
|
||||
for child_task_guid in task_model.properties_json["children"]:
|
||||
child_task_model = TaskModel.query.filter_by(guid=child_task_guid).first()
|
||||
if child_task_model is None:
|
||||
continue
|
||||
if child_task_model.state not in ["COMPLETED", "CANCELLED"]:
|
||||
all_children_completed = False
|
||||
child_task_models.append(child_task_model)
|
||||
|
||||
for child_task_model in child_task_models:
|
||||
if child_task_model.state == "CANCELLED":
|
||||
# Cancelled tasks don't have children
|
||||
continue
|
||||
|
||||
new_task_state = None
|
||||
start_in_seconds = child_task_model.start_in_seconds
|
||||
end_in_seconds = None
|
||||
|
||||
if child_task_model.state in ["MAYBE", "LIKELY", "FUTURE"]:
|
||||
new_task_state = child_task_model.state
|
||||
elif child_task_model.state in ["WAITING", "READY", "STARTED"]:
|
||||
new_task_state = "FUTURE"
|
||||
elif child_task_model.state == "COMPLETED":
|
||||
if all_children_completed:
|
||||
new_task_state = "COMPLETED"
|
||||
end_in_seconds = child_task_model.end_in_seconds
|
||||
else:
|
||||
new_task_state = "WAITING"
|
||||
elif child_task_model.state == "ERROR":
|
||||
new_task_state = "WAITING"
|
||||
else:
|
||||
raise Exception(f"Unknown state: {child_task_model.state} for {child_task_model.guild}")
|
||||
|
||||
new_task_properties_json = {
|
||||
"id": str(uuid.uuid4()),
|
||||
"parent": child_task_model.guid,
|
||||
"children": [],
|
||||
"state": Task.task_state_name_to_int(new_task_state),
|
||||
"task_spec": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
|
||||
"last_state_change": None,
|
||||
"triggered": False,
|
||||
"internal_data": {},
|
||||
}
|
||||
|
||||
new_task_model = TaskModel(
|
||||
guid=new_task_properties_json["id"],
|
||||
bpmn_process_id=task_model.bpmn_process_id,
|
||||
process_instance_id=task_model.process_instance_id,
|
||||
task_definition_id=task_model.task_definition_id,
|
||||
state=new_task_state,
|
||||
properties_json=new_task_properties_json,
|
||||
start_in_seconds=start_in_seconds,
|
||||
end_in_seconds=end_in_seconds,
|
||||
json_data_hash=blank_json_data_hash,
|
||||
python_env_data_hash=blank_json_data_hash,
|
||||
)
|
||||
db.session.add(new_task_model)
|
||||
|
||||
child_task_model.properties_json["children"].append(new_task_model.guid)
|
||||
flag_modified(child_task_model, "properties_json") # type: ignore
|
||||
db.session.add(child_task_model)
|
||||
|
||||
def update_event_definitions(self, task_definition: TaskDefinitionModel) -> None:
|
||||
if "event_definition" in task_definition.properties_json:
|
||||
properties_json = copy.copy(task_definition.properties_json)
|
||||
properties_json["event_definition"].pop("internal", None)
|
||||
properties_json["event_definition"].pop("external", None)
|
||||
if "escalation_code" in properties_json["event_definition"]:
|
||||
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop(
|
||||
"escalation_code"
|
||||
)
|
||||
if "error_code" in properties_json["event_definition"]:
|
||||
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop("error_code")
|
||||
task_definition.properties_json = properties_json
|
||||
flag_modified(task_definition, "properties_json") # type: ignore
|
||||
db.session.add(task_definition)
|
|
@ -29,8 +29,11 @@ class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel):
|
|||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
message_instance_id = db.Column(ForeignKey(MessageInstanceModel.id), nullable=False, index=True) # type: ignore
|
||||
|
||||
name: str = db.Column(db.String(50), nullable=False, index=True)
|
||||
retrieval_expression: str = db.Column(db.String(255))
|
||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
correlation_key_names: list = db.Column(db.JSON)
|
||||
|
||||
message_instance = relationship("MessageInstanceModel", back_populates="correlation_rules")
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
from SpiffWorkflow.bpmn.workflow import BpmnMessage # type: ignore
|
||||
from SpiffWorkflow.bpmn.event import BpmnEvent # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty # type: ignore
|
||||
from SpiffWorkflow.spiff.specs.event_definitions import MessageEventDefinition # type: ignore
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageStatuses
|
||||
|
@ -72,8 +74,7 @@ class MessageService:
|
|||
cls.process_message_receive(
|
||||
receiving_process,
|
||||
message_instance_receive,
|
||||
message_instance_send.name,
|
||||
message_instance_send.payload,
|
||||
message_instance_send,
|
||||
)
|
||||
message_instance_receive.status = "completed"
|
||||
message_instance_receive.counterpart_id = message_instance_send.id
|
||||
|
@ -130,7 +131,7 @@ class MessageService:
|
|||
(
|
||||
(
|
||||
"Process instance cannot be found for queued message:"
|
||||
f" {message_instance_receive.id}.Tried with id"
|
||||
f" {message_instance_receive.id}. Tried with id"
|
||||
f" {message_instance_receive.process_instance_id}"
|
||||
),
|
||||
)
|
||||
|
@ -141,16 +142,28 @@ class MessageService:
|
|||
def process_message_receive(
|
||||
process_instance_receive: ProcessInstanceModel,
|
||||
message_instance_receive: MessageInstanceModel,
|
||||
message_model_name: str,
|
||||
message_payload: dict,
|
||||
message_instance_send: MessageInstanceModel,
|
||||
) -> None:
|
||||
bpmn_message = BpmnMessage(
|
||||
None,
|
||||
message_model_name,
|
||||
message_payload,
|
||||
correlation_properties = []
|
||||
for cr in message_instance_receive.correlation_rules:
|
||||
correlation_properties.append(
|
||||
CorrelationProperty(
|
||||
name=cr.name,
|
||||
retrieval_expression=cr.retrieval_expression,
|
||||
correlation_keys=cr.correlation_key_names,
|
||||
)
|
||||
)
|
||||
bpmn_message = MessageEventDefinition(
|
||||
name=message_instance_send.name,
|
||||
correlation_properties=correlation_properties,
|
||||
)
|
||||
bpmn_event = BpmnEvent(
|
||||
event_definition=bpmn_message,
|
||||
payload=message_instance_send.payload,
|
||||
correlations=message_instance_send.correlation_keys,
|
||||
)
|
||||
processor_receive = ProcessInstanceProcessor(process_instance_receive)
|
||||
processor_receive.bpmn_process_instance.catch_bpmn_message(bpmn_message)
|
||||
processor_receive.bpmn_process_instance.send_event(bpmn_event)
|
||||
processor_receive.do_engine_steps(save=True)
|
||||
message_instance_receive.status = MessageStatuses.completed.value
|
||||
db.session.add(message_instance_receive)
|
||||
|
|
|
@ -24,6 +24,7 @@ from flask import current_app
|
|||
from lxml import etree # type: ignore
|
||||
from lxml.etree import XMLSyntaxError # type: ignore
|
||||
from RestrictedPython import safe_globals # type: ignore
|
||||
from SpiffWorkflow.bpmn.event import BpmnEvent # type: ignore
|
||||
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
|
||||
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
|
||||
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore
|
||||
|
@ -1089,10 +1090,12 @@ class ProcessInstanceProcessor:
|
|||
"""Send an event to the workflow."""
|
||||
payload = event_data.pop("payload", None)
|
||||
event_definition = self._event_serializer.registry.restore(event_data)
|
||||
if payload is not None:
|
||||
event_definition.payload = payload
|
||||
bpmn_event = BpmnEvent(
|
||||
event_definition=event_definition,
|
||||
payload=payload,
|
||||
)
|
||||
try:
|
||||
self.bpmn_process_instance.catch(event_definition)
|
||||
self.bpmn_process_instance.send_event(bpmn_event)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
|
|
@ -10,8 +10,9 @@ from urllib.parse import unquote
|
|||
import sentry_sdk
|
||||
from flask import current_app
|
||||
from flask import g
|
||||
from SpiffWorkflow.bpmn.specs.control import _BoundaryEventParent # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition # type: ignore
|
||||
from SpiffWorkflow.bpmn.event import PendingBpmnEvent # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition # type: ignore
|
||||
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||
from spiffworkflow_backend import db
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
|
@ -174,16 +175,16 @@ class ProcessInstanceService:
|
|||
db.session.commit()
|
||||
|
||||
@classmethod
|
||||
def waiting_event_can_be_skipped(cls, waiting_event: dict[str, Any], now_in_utc: datetime) -> bool:
|
||||
def waiting_event_can_be_skipped(cls, waiting_event: PendingBpmnEvent, now_in_utc: datetime) -> bool:
|
||||
#
|
||||
# over time this function can gain more knowledge of different event types,
|
||||
# for now we are just handling Duration Timer events.
|
||||
#
|
||||
# example: {'event_type': 'Duration Timer', 'name': None, 'value': '2023-04-27T20:15:10.626656+00:00'}
|
||||
#
|
||||
spiff_event_type = waiting_event.get("event_type")
|
||||
spiff_event_type = waiting_event.event_type
|
||||
if spiff_event_type == "DurationTimerEventDefinition":
|
||||
event_value = waiting_event.get("value")
|
||||
event_value = waiting_event.value
|
||||
if event_value is not None:
|
||||
event_datetime = TimerEventDefinition.get_datetime(event_value)
|
||||
return event_datetime > now_in_utc # type: ignore
|
||||
|
@ -199,7 +200,7 @@ class ProcessInstanceService:
|
|||
@classmethod
|
||||
def ready_user_task_has_associated_timer(cls, processor: ProcessInstanceProcessor) -> bool:
|
||||
for ready_user_task in processor.bpmn_process_instance.get_ready_user_tasks():
|
||||
if isinstance(ready_user_task.parent.task_spec, _BoundaryEventParent):
|
||||
if isinstance(ready_user_task.parent.task_spec, BoundaryEventSplit):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ from uuid import UUID
|
|||
|
||||
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
|
||||
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore
|
||||
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
|
||||
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
|
||||
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||
|
@ -432,15 +433,21 @@ class WorkflowExecutionService:
|
|||
self.process_instance_saver()
|
||||
|
||||
def process_bpmn_messages(self) -> None:
|
||||
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
|
||||
for bpmn_message in bpmn_messages:
|
||||
# FIXE: get_events clears out the events so if we have other events we care about
|
||||
# this will clear them out as well.
|
||||
# Right now we only care about messages though.
|
||||
bpmn_events = self.bpmn_process_instance.get_events()
|
||||
for bpmn_event in bpmn_events:
|
||||
if not isinstance(bpmn_event.event_definition, MessageEventDefinition):
|
||||
continue
|
||||
bpmn_message = bpmn_event.event_definition
|
||||
message_instance = MessageInstanceModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
user_id=self.process_instance_model.process_initiator_id,
|
||||
# TODO: use the correct swimlane user when that is set up
|
||||
message_type="send",
|
||||
name=bpmn_message.name,
|
||||
payload=bpmn_message.payload,
|
||||
payload=bpmn_event.payload,
|
||||
correlation_keys=self.bpmn_process_instance.correlations,
|
||||
)
|
||||
db.session.add(message_instance)
|
||||
|
@ -450,21 +457,21 @@ class WorkflowExecutionService:
|
|||
bpmn_process_correlations = self.bpmn_process_instance.correlations
|
||||
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
|
||||
# update correlations correctly but always null out bpmn_messages since they get cleared out later
|
||||
bpmn_process.properties_json["bpmn_messages"] = []
|
||||
bpmn_process.properties_json["bpmn_events"] = []
|
||||
db.session.add(bpmn_process)
|
||||
|
||||
db.session.commit()
|
||||
|
||||
def queue_waiting_receive_messages(self) -> None:
|
||||
waiting_events = self.bpmn_process_instance.waiting_events()
|
||||
waiting_message_events = filter(lambda e: e["event_type"] == "MessageEventDefinition", waiting_events)
|
||||
waiting_message_events = filter(lambda e: e.event_type == "MessageEventDefinition", waiting_events)
|
||||
for event in waiting_message_events:
|
||||
# Ensure we are only creating one message instance for each waiting message
|
||||
if (
|
||||
MessageInstanceModel.query.filter_by(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_type="receive",
|
||||
name=event["name"],
|
||||
name=event.name,
|
||||
).count()
|
||||
> 0
|
||||
):
|
||||
|
@ -475,14 +482,15 @@ class WorkflowExecutionService:
|
|||
process_instance_id=self.process_instance_model.id,
|
||||
user_id=self.process_instance_model.process_initiator_id,
|
||||
message_type="receive",
|
||||
name=event["name"],
|
||||
name=event.name,
|
||||
correlation_keys=self.bpmn_process_instance.correlations,
|
||||
)
|
||||
for correlation_property in event["value"]:
|
||||
for correlation_property in event.value:
|
||||
message_correlation = MessageInstanceCorrelationRuleModel(
|
||||
message_instance=message_instance,
|
||||
name=correlation_property.name,
|
||||
retrieval_expression=correlation_property.retrieval_expression,
|
||||
correlation_key_names=correlation_property.correlation_keys,
|
||||
)
|
||||
db.session.add(message_correlation)
|
||||
db.session.add(message_instance)
|
||||
|
|
|
@ -6,17 +6,17 @@ from SpiffWorkflow.bpmn.parser.util import full_tag # type: ignore
|
|||
from SpiffWorkflow.bpmn.serializer.task_spec import EventConverter # type: ignore
|
||||
from SpiffWorkflow.bpmn.serializer.task_spec import StartEventConverter as DefaultStartEventConverter
|
||||
from SpiffWorkflow.bpmn.specs.defaults import StartEvent as DefaultStartEvent # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions import CycleTimerEventDefinition # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions import DurationTimerEventDefinition
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions import NoneEventDefinition
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions import TimeDateEventDefinition
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions import TimerEventDefinition
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.simple import NoneEventDefinition # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.timer import CycleTimerEventDefinition # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.timer import DurationTimerEventDefinition
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimeDateEventDefinition
|
||||
from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition
|
||||
from SpiffWorkflow.spiff.parser.event_parsers import SpiffStartEventParser # type: ignore
|
||||
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||
|
||||
StartConfiguration = tuple[int, int, int]
|
||||
|
||||
# TODO: cylce timers and repeat counts?
|
||||
# TODO: cycle timers and repeat counts?
|
||||
|
||||
|
||||
class StartEvent(DefaultStartEvent): # type: ignore
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
|
||||
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
|
||||
<bpmn:process id="test_process_to_call" name="Test Process To Call" isExecutable="true">
|
||||
<bpmn:endEvent id="Event_03zsjvn">
|
||||
<bpmn:incoming>Flow_089aeua</bpmn:incoming>
|
||||
|
@ -51,9 +51,14 @@
|
|||
<bpmn:script>set_in_test_process_to_call_script = 1</bpmn:script>
|
||||
</bpmn:scriptTask>
|
||||
<bpmn:boundaryEvent id="our_boundary_event" name="our_boundary_event" attachedToRef="test_process_to_call_script">
|
||||
<bpmn:escalationEventDefinition id="EscalationEventDefinition_1bs7saf" />
|
||||
<bpmn:escalationEventDefinition id="EscalationEventDefinition_0t7834v" escalationRef="Escalation_18qf8th" errorRef="[object Object]" />
|
||||
</bpmn:boundaryEvent>
|
||||
</bpmn:process>
|
||||
<bpmn:escalation id="Escalation_18qf8th" name="Our Escalation" errorCode="26">
|
||||
<bpmn:extensionElements>
|
||||
<spiffworkflow:variableName>the_var</spiffworkflow:variableName>
|
||||
</bpmn:extensionElements>
|
||||
</bpmn:escalation>
|
||||
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
|
||||
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="test_process_to_call">
|
||||
<bpmndi:BPMNShape id="Event_03zsjvn_di" bpmnElement="Event_03zsjvn">
|
||||
|
@ -69,10 +74,10 @@
|
|||
<dc:Bounds x="450" y="110" width="100" height="80" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Event_0wfnf83_di" bpmnElement="our_boundary_event">
|
||||
<dc:Bounds x="492" y="172" width="36" height="36" />
|
||||
<bpmndi:BPMNShape id="Event_0bjx9wm_di" bpmnElement="our_boundary_event">
|
||||
<dc:Bounds x="502" y="172" width="36" height="36" />
|
||||
<bpmndi:BPMNLabel>
|
||||
<dc:Bounds x="467" y="215" width="87" height="27" />
|
||||
<dc:Bounds x="477" y="215" width="87" height="27" />
|
||||
</bpmndi:BPMNLabel>
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNEdge id="Flow_1qsx5et_di" bpmnElement="Flow_1qsx5et">
|
||||
|
|
|
@ -2412,8 +2412,6 @@ class TestProcessApi(BaseTest):
|
|||
data: dict = {
|
||||
"correlation_properties": [],
|
||||
"expression": None,
|
||||
"external": True,
|
||||
"internal": False,
|
||||
"payload": {"message": "message 1"},
|
||||
"name": "Message 1",
|
||||
"typename": "MessageEventDefinition",
|
||||
|
|
|
@ -2,6 +2,7 @@ from datetime import datetime
|
|||
from datetime import timezone
|
||||
|
||||
from flask.app import Flask
|
||||
from SpiffWorkflow.bpmn.event import PendingBpmnEvent # type: ignore
|
||||
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
|
||||
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
|
||||
|
||||
|
@ -198,39 +199,47 @@ class TestProcessInstanceService(BaseTest):
|
|||
self._check_sample_file_data_model("File", 1, models[1])
|
||||
|
||||
def test_does_not_skip_events_it_does_not_know_about(self) -> None:
|
||||
name = None
|
||||
event_type = "Unknown"
|
||||
value = "2023-04-27T20:15:10.626656+00:00"
|
||||
pending_event = PendingBpmnEvent(name, event_type, value)
|
||||
assert not (
|
||||
ProcessInstanceService.waiting_event_can_be_skipped(
|
||||
{"event_type": "Unknown", "name": None, "value": "2023-04-27T20:15:10.626656+00:00"},
|
||||
pending_event,
|
||||
datetime.now(timezone.utc),
|
||||
)
|
||||
)
|
||||
|
||||
def test_does_skip_duration_timer_events_for_the_future(self) -> None:
|
||||
name = None
|
||||
event_type = "DurationTimerEventDefinition"
|
||||
value = "2023-04-27T20:15:10.626656+00:00"
|
||||
pending_event = PendingBpmnEvent(name, event_type, value)
|
||||
assert ProcessInstanceService.waiting_event_can_be_skipped(
|
||||
{"event_type": "DurationTimerEventDefinition", "name": None, "value": "2023-04-27T20:15:10.626656+00:00"},
|
||||
pending_event,
|
||||
datetime.fromisoformat("2023-04-26T20:15:10.626656+00:00"),
|
||||
)
|
||||
|
||||
def test_does_not_skip_duration_timer_events_for_the_past(self) -> None:
|
||||
name = None
|
||||
event_type = "DurationTimerEventDefinition"
|
||||
value = "2023-04-27T20:15:10.626656+00:00"
|
||||
pending_event = PendingBpmnEvent(name, event_type, value)
|
||||
assert not (
|
||||
ProcessInstanceService.waiting_event_can_be_skipped(
|
||||
{
|
||||
"event_type": "DurationTimerEventDefinition",
|
||||
"name": None,
|
||||
"value": "2023-04-27T20:15:10.626656+00:00",
|
||||
},
|
||||
pending_event,
|
||||
datetime.fromisoformat("2023-04-28T20:15:10.626656+00:00"),
|
||||
)
|
||||
)
|
||||
|
||||
def test_does_not_skip_duration_timer_events_for_now(self) -> None:
|
||||
name = None
|
||||
event_type = "DurationTimerEventDefinition"
|
||||
value = "2023-04-27T20:15:10.626656+00:00"
|
||||
pending_event = PendingBpmnEvent(name, event_type, value)
|
||||
assert not (
|
||||
ProcessInstanceService.waiting_event_can_be_skipped(
|
||||
{
|
||||
"event_type": "DurationTimerEventDefinition",
|
||||
"name": None,
|
||||
"value": "2023-04-27T20:15:10.626656+00:00",
|
||||
},
|
||||
pending_event,
|
||||
datetime.fromisoformat("2023-04-27T20:15:10.626656+00:00"),
|
||||
)
|
||||
)
|
||||
|
|
|
@ -38375,7 +38375,7 @@
|
|||
},
|
||||
"bpmn-js-spiffworkflow": {
|
||||
"version": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#9dcca6c80b8ab8ed0d79658456047b90e8483541",
|
||||
"from": "bpmn-js-spiffworkflow@https://github.com/sartography/bpmn-js-spiffworkflow.git#main",
|
||||
"from": "bpmn-js-spiffworkflow@github:sartography/bpmn-js-spiffworkflow#main",
|
||||
"requires": {
|
||||
"inherits": "^2.0.4",
|
||||
"inherits-browser": "^0.0.1",
|
||||
|
|
|
@ -379,7 +379,9 @@ export default function ReactDiagramEditor({
|
|||
return (
|
||||
!taskSpecsThatCannotBeHighlighted.includes(taskBpmnId) &&
|
||||
!taskBpmnId.match(/EndJoin/) &&
|
||||
!taskBpmnId.match(/BoundaryEventParent/)
|
||||
!taskBpmnId.match(/BoundaryEventParent/) &&
|
||||
!taskBpmnId.match(/BoundaryEventJoin/) &&
|
||||
!taskBpmnId.match(/BoundaryEventSplit/)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue