fall back to initiator id for current user if g.user is not set and disable spiff logging when initializing workflows from bpmn_json

This commit is contained in:
jasquat 2022-09-15 17:29:41 -04:00
parent 15b5a271ae
commit 04bb8efff4
6 changed files with 36 additions and 37 deletions

View File

@ -36,7 +36,7 @@ def start_scheduler(app: flask.app.Flask) -> None:
scheduler.add_job(
MessageServiceWithAppContext(app).process_message_instances_with_app_context,
"interval",
minutes=1,
seconds=10,
)
scheduler.start()

View File

@ -15,6 +15,7 @@ from flask import g
from flask import jsonify
from flask import make_response
from flask import request
from flask import current_app
from flask.wrappers import Response
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
@ -344,6 +345,9 @@ def process_instance_run(
processor.save()
ProcessInstanceService.update_task_assignments(processor)
if not current_app.config["PROCESS_WAITING_MESSAGES"]:
MessageService.process_message_instances()
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(
processor
)

View File

@ -105,6 +105,7 @@ class SpiffFilter(logging.Filter):
def setup_logger(app: Flask) -> None:
"""Setup_logger."""
log_level = logging.DEBUG
spiff_log_level = logging.DEBUG
log_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
@ -129,7 +130,7 @@ def setup_logger(app: Flask) -> None:
spiff_logger_filehandler = None
if app.config["SPIFFWORKFLOW_BACKEND_LOG_TO_FILE"]:
spiff_logger_filehandler = logging.FileHandler(f"log/{app.env}.log")
spiff_logger_filehandler.setLevel(logging.DEBUG)
spiff_logger_filehandler.setLevel(spiff_log_level)
spiff_logger_filehandler.setFormatter(log_formatter)
# make all loggers act the same
@ -147,7 +148,7 @@ def setup_logger(app: Flask) -> None:
the_handler.setLevel(log_level)
spiff_logger = logging.getLogger("spiff")
spiff_logger.setLevel(logging.DEBUG)
spiff_logger.setLevel(spiff_log_level)
spiff_formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s | %(action)s | %(task_type)s | %(process)s | %(processName)s | %(process_instance_id)s"
)
@ -156,7 +157,7 @@ def setup_logger(app: Flask) -> None:
# if you add a filter to the spiff logger directly (and not the handler), it will NOT be inherited by spiff.metrics
# so put filters on handlers.
db_handler = DBHandler()
db_handler.setLevel(logging.DEBUG)
db_handler.setLevel(spiff_log_level)
db_handler.setFormatter(spiff_formatter)
db_handler.addFilter(SpiffFilter(app))
spiff_logger.addHandler(db_handler)

View File

@ -138,15 +138,13 @@ class MessageService:
process_group_identifier=message_triggerable_process_model.process_group_identifier,
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.do_engine_steps()
processor_receive.do_engine_steps(save=False)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
message_payload,
correlations={},
)
processor_receive.save()
processor_receive.do_engine_steps()
processor_receive.save()
processor_receive.do_engine_steps(save=True)
return process_instance_receive
@ -174,8 +172,7 @@ class MessageService:
message_payload,
correlations={},
)
processor_receive.do_engine_steps()
processor_receive.save()
processor_receive.do_engine_steps(save=True)
@staticmethod
def get_message_instance_receive(

View File

@ -1,5 +1,6 @@
"""Process_instance_processor."""
import json
import logging
import os
import time
from typing import Any
@ -157,7 +158,7 @@ class ProcessInstanceProcessor:
]
)
_serializer = BpmnWorkflowSerializer(wf_spec_converter, version=SERIALIZER_VERSION)
_old_serializer = BpmnSerializer()
PROCESS_INSTANCE_ID_KEY = "process_instance_id"
VALIDATION_PROCESS_KEY = "validate_only"
@ -260,14 +261,6 @@ class ProcessInstanceProcessor:
self.bpmn_process_instance.data[
ProcessInstanceProcessor.PROCESS_INSTANCE_ID_KEY
] = process_instance_model.id
# FIXME: This also seems to happen in the save method below
process_instance_model.bpmn_json = (
ProcessInstanceProcessor._serializer.serialize_json(
self.bpmn_process_instance
)
)
self.save()
except MissingSpecError as ke:
@ -278,11 +271,19 @@ class ProcessInstanceProcessor:
% (self.process_model_identifier, str(ke)),
) from ke
@staticmethod
def add_user_info_to_process_instance(bpmn_process_instance: BpmnWorkflow) -> None:
def add_user_info_to_process_instance(self, bpmn_process_instance: BpmnWorkflow) -> None:
"""Add_user_info_to_process_instance."""
current_user = None
if UserService.has_user():
current_user = UserService.current_user(allow_admin_impersonate=True)
# fall back to initiator if g.user is not set
# this is for background processes when there will not be a user
# coming in from the api
elif self.process_instance_model.process_initiator_id:
current_user = self.process_instance_model.process_initiator
if current_user:
current_user_data = UserModelSchema().dump(current_user)
tasks = bpmn_process_instance.get_tasks(TaskState.READY)
for task in tasks:
@ -350,19 +351,17 @@ class ProcessInstanceProcessor:
) -> BpmnWorkflow:
"""__get_bpmn_process_instance."""
if process_instance_model.bpmn_json:
version = ProcessInstanceProcessor._serializer.get_version(
process_instance_model.bpmn_json
# turn off logging to avoid duplicated spiff logs
spiff_logger = logging.getLogger("spiff")
original_spiff_logger_log_level = spiff_logger.level
spiff_logger.setLevel(logging.WARNING)
bpmn_process_instance = (
ProcessInstanceProcessor._serializer.deserialize_json(
process_instance_model.bpmn_json
)
)
if version == ProcessInstanceProcessor.SERIALIZER_VERSION:
bpmn_process_instance = (
ProcessInstanceProcessor._serializer.deserialize_json(
process_instance_model.bpmn_json
)
)
else:
bpmn_process_instance = ProcessInstanceProcessor._old_serializer.deserialize_process_instance(
process_instance_model.bpmn_json, process_model=spec
)
spiff_logger.setLevel(original_spiff_logger_log_level)
bpmn_process_instance.script_engine = (
ProcessInstanceProcessor._script_engine
)
@ -415,7 +414,6 @@ class ProcessInstanceProcessor:
if not self.bpmn_process_instance._is_engine_task(
ready_or_waiting_task.task_spec
):
user_id = ready_or_waiting_task.data["current_user"]["id"]
principal = PrincipalModel.query.filter_by(user_id=user_id).first()
if principal is None:
@ -953,11 +951,11 @@ class ProcessInstanceProcessor:
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [t for t in all_tasks if t.state in [TaskState.WAITING, TaskState.READY]]
def get_task_by_id(self, task_id: str) -> SpiffTask:
def get_task_by_bpmn_identifier(self, bpmn_task_identifier: str) -> SpiffTask:
"""Get_task_by_id."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
for task in all_tasks:
if task.id == task_id:
if task.task_spec.name == bpmn_task_identifier:
return task
return None

View File

@ -485,7 +485,6 @@ class SpecFileService(FileSystemService):
f"Could not find message model with identifier '{message_model_identifier}'"
f"specified by correlation property: {cpre}"
)
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
identifier=correlation_identifier,