Event stream (#1993)

* WIP

* WIP

* WIP

* WIP

* add socket handler for spiff events

* standardize attributes of event log

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* Get tests to pass

* Getting pyl to pass

* Moving to regular socket

* Try/catch

* WIP

* Getting dockerized

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* Its broken

* WIP

* WIP

* WIP

* Update event-stream/elasticsearch.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Update event-stream/elasticsearch.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Update event-stream/elasticsearch.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Fix tests

---------

Co-authored-by: Elizabeth Esswein <elizabeth.esswein@gmail.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Kevin Burnett <18027+burnettk@users.noreply.github.com>
This commit is contained in:
jbirddog 2024-07-26 11:52:03 -04:00 committed by GitHub
parent 0ea0137af8
commit b8c7aa991d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 330 additions and 4 deletions

View File

@ -146,6 +146,8 @@ sh:
take-ownership:
$(SUDO) chown -R $(ME) .
include event-stream/demo.mk
.PHONY: build-images dev-env \
start-dev stop-dev \
be-clear-log-file be-logs be-mypy be-poetry-i be-poetry-lock be-poetry-rm \

79
event-stream/demo.mk Normal file
View File

@ -0,0 +1,79 @@
EVENTS_DEMO_CONTAINER ?= spiff-arena-event-stream-1
IN_EVENTS_DEMO ?= $(DOCKER_COMPOSE) run --rm $(EVENTS_DEMO_CONTAINER)
NETWORK ?= spiff-arena_default
ELASTIC_PASSWORD ?= elastic
KIBANA_PASSWORD ?= kibana
net-start:
docker network create $(NETWORK) || true
net-stop:
docker network rm $(NETWORK) || true
elasticsearch-start:
docker run -p 127.0.0.1:9200:9200 -d --name elasticsearch --network $(NETWORK) \
-e ELASTIC_PASSWORD=$(ELASTIC_PASSWORD) \
-e "discovery.type=single-node" \
-e "xpack.security.http.ssl.enabled=false" \
-e "xpack.license.self_generated.type=basic" \
docker.elastic.co/elasticsearch/elasticsearch:8.14.3
elasticsearch-wait-for-boot:
sleep 30
elasticsearch-create-index:
curl -u elastic:$(ELASTIC_PASSWORD) \
-X PUT \
http://localhost:9200/events-index \
-H 'Content-Type: application/json'
elasticsearch-kibana-set-pwd:
curl -u elastic:$(ELASTIC_PASSWORD) \
-X POST \
http://localhost:9200/_security/user/kibana_system/_password \
-d '{"password":"'"$(KIBANA_PASSWORD)"'"}' \
-H 'Content-Type: application/json'
elasticsearch-stop:
docker stop elasticsearch && docker container rm elasticsearch
kibana-start:
docker run -p 127.0.0.1:5601:5601 -d --name kibana --network $(NETWORK) \
-e ELASTICSEARCH_URL=http://elasticsearch:9200 \
-e ELASTICSEARCH_HOSTS=http://elasticsearch:9200 \
-e ELASTICSEARCH_USERNAME=kibana_system \
-e ELASTICSEARCH_PASSWORD=$(KIBANA_PASSWORD) \
-e "xpack.security.enabled=false" \
-e "xpack.license.self_generated.type=trial" \
docker.elastic.co/kibana/kibana:8.14.3
kibana-stop:
docker stop kibana && docker container rm kibana
events-demo-start: net-start \
elasticsearch-start \
elasticsearch-wait-for-boot \
elasticsearch-create-index \
elasticsearch-kibana-set-pwd \
kibana-start
@true
events-demo-stop: kibana-stop elasticsearch-stop net-stop
@true
events-demo-logs:
docker logs -f $(EVENTS_DEMO_CONTAINER)
events-demo-sh:
$(IN_EVENTS_DEMO) /bin/bash
.PHONY: net-start net-stop \
elasticsearch-start \
elasticsearch-wait-for-boot \
elasticsearch-create-index elasticsearch-kibana-set-pwd \
elasticsearch-stop \
kibana-start kibana-stop \
events-demo-start events-demo-stop \
events-demo-logs events-demo-sh

View File

@ -0,0 +1,13 @@
FROM python:3.12.1-slim-bookworm
WORKDIR /app
RUN apt-get update \
&& apt-get install -y -q \
curl \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY elasticsearch.py elasticsearch.py
CMD ["python", "elasticsearch.py"]

View File

@ -0,0 +1,78 @@
import json
import os, os.path
import socket
import sys
import urllib.request
HOST = os.environ["SPIFFWORKFLOW_EVENT_STREAM_HOST"]
PORT = int(os.environ["SPIFFWORKFLOW_EVENT_STREAM_PORT"])
ELASTICSEARCH_HOST = os.environ["SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_HOST"]
ELASTICSEARCH_PORT = int(os.environ["SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_PORT"])
ELASTICSEARCH_USERNAME = os.environ["SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_USERNAME"]
ELASTICSEARCH_PASSWORD = os.environ["SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_PASSWORD"]
ELASTICSEARCH_INDEX = os.environ.get(
"SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_INDEX",
"spiffworkflow_event_stream"
)
ELASTICSEARCH_USE_HTTPS = os.environ.get(
"SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_USE_HTTPS",
"true"
) != "false"
ELASTICSEARCH_PROTOCOL = "https" if ELASTICSEARCH_USE_HTTPS else "http"
ELASTICSEARCH_URL = f"{ELASTICSEARCH_PROTOCOL}://" \
f"{ELASTICSEARCH_HOST}:{ELASTICSEARCH_PORT}/" \
f"{ELASTICSEARCH_INDEX}/_doc"
def init_urllib():
password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(
None,
ELASTICSEARCH_URL,
ELASTICSEARCH_USERNAME,
ELASTICSEARCH_PASSWORD,
)
auth_handler = urllib.request.HTTPBasicAuthHandler(password_mgr)
opener = urllib.request.build_opener(auth_handler)
urllib.request.install_opener(opener)
def send_event(event):
try:
post_body = json.dumps(event).encode("utf-8")
request = urllib.request.Request(ELASTICSEARCH_URL)
request.add_header("Content-Type", "application/json")
request.add_header("Content-Length", len(post_body))
response = urllib.request.urlopen(request, post_body).read()
print(response.decode("utf-8"))
except Exception as e:
print(f"ERROR: Failed to send event: {e}", file=sys.stderr)
with socket.create_server((HOST, PORT)) as sock:
init_urllib()
while True:
client_sock, addr = sock.accept()
with client_sock, client_sock.makefile() as fh:
line = fh.readline()
while line:
line = line.strip()
print(line)
try:
request = json.loads(line)
if request["action"] == "add_event":
event = request["event"]
send_event(event)
else:
print(f"ERROR: Unknown action: {request['action']} - {line}", file=sys.stderr)
except Exception as e:
print(f"ERROR: Invalid Request: {e} - {line}", file=sys.stderr)
line = fh.readline()

View File

@ -7,14 +7,38 @@ services:
user: "${RUN_AS}"
environment:
FLASK_DEBUG: "1"
#POETRY_CACHE_DIR: "/app/.cache/poetry"
POETRY_VIRTUALENVS_IN_PROJECT: "true"
SPIFFWORKFLOW_BACKEND_DATABASE_URI: ""
SPIFFWORKFLOW_BACKEND_LOG_LEVEL: "INFO"
SPIFFWORKFLOW_BACKEND_ENV: "${SPIFFWORKFLOW_BACKEND_ENV:-local_development}"
SPIFFWORKFLOW_BACKEND_LOAD_FIXTURE_DATA: ""
SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST: "${SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST:-spiff-arena-event-stream-1}"
SPIFFWORKFLOW_BACKEND_EVENT_STREAM_PORT: "${SPIFFWORKFLOW_BACKEND_EVENT_STREAM_PORT:-8008}"
XDG_CACHE_HOME: "/app/.cache"
env_file:
- path: .env
required: false
volumes:
- ./spiffworkflow-backend:/app
event-stream:
build:
context: event-stream
dockerfile: elasticsearch.Dockerfile
user: "${RUN_AS}"
environment:
SPIFFWORKFLOW_EVENT_STREAM_HOST: "${SPIFFWORKFLOW_EVENT_STREAM_HOST:-0.0.0.0}"
SPIFFWORKFLOW_EVENT_STREAM_PORT: "${SPIFFWORKFLOW_EVENT_STREAM_PORT:-8008}"
SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_HOST: "${SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_HOST:-elasticsearch}"
SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_PORT: "${SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_PORT:-9200}"
SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_USE_HTTPS: "${SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_USE_HTTPS:-false}"
SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_USERNAME: "${SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_USERNAME:-elastic}"
SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_PASSWORD: "${SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_PASSWORD:-elastic}"
SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_INDEX: "${SPIFFWORKFLOW_EVENT_STREAM_ELASTICSEARCH_INDEX:-spiffworkflow_event_stream}"
XDG_CACHE_HOME: "/app/.cache"
env_file:
- path: .env
required: false
ports:
- "${SPIFFWORKFLOW_EVENT_STREAM_PORT:-8008}:${SPIFFWORKFLOW_EVENT_STREAM_PORT:-8008}/tcp"
volumes:
- ./event-stream:/app

View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand.
[[package]]
name = "alembic"
@ -2958,7 +2958,7 @@ doc = ["sphinx", "sphinx_rtd_theme"]
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "8e000c0f52433a868742482da97abcd395d27dfc"
resolved_reference = "14c84aa26d0485753ed8f174d69fb12a888c455a"
[[package]]
name = "spiffworkflow-connector-command"

View File

@ -161,6 +161,8 @@ else:
config_from_env("SPIFFWORKFLOW_BACKEND_LOGGERS_TO_USE")
config_from_env("SPIFFWORKFLOW_BACKEND_LOG_LEVEL", default="info")
config_from_env("SPIFFWORKFLOW_BACKEND_LOG_TO_FILE", default=False)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST", default=None)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_PORT", default=None)
### permissions
config_from_env("SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH")

View File

@ -16,6 +16,8 @@ from spiffworkflow_backend.models.user import UserModel
# event types take the form [SUBJECT]_[PAST_TENSE_VERB] since subject is not always the same.
class ProcessInstanceEventType(SpiffEnum):
process_instance_created = "process_instance_created"
process_instance_completed = "process_instance_completed"
process_instance_error = "process_instance_error"
process_instance_force_run = "process_instance_force_run"
process_instance_migrated = "process_instance_migrated"

View File

@ -33,6 +33,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceApiSche
from spiffworkflow_backend.models.process_instance import ProcessInstanceCannotBeDeletedError
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
from spiffworkflow_backend.models.process_instance_report import ProcessInstanceReportModel
from spiffworkflow_backend.models.process_instance_report import Report
@ -50,6 +51,7 @@ from spiffworkflow_backend.services.authorization_service import AuthorizationSe
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
from spiffworkflow_backend.services.git_service import GitCommandError
from spiffworkflow_backend.services.git_service import GitService
from spiffworkflow_backend.services.logging_service import LoggingService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsNotEnqueuedError
@ -67,6 +69,13 @@ def process_instance_create(
process_model_identifier = _un_modify_modified_process_model_id(modified_process_model_identifier)
process_instance = _process_instance_create(process_model_identifier)
LoggingService.log_event(
ProcessInstanceEventType.process_instance_created.value,
process_model_identifier=process_model_identifier,
process_instance_id=process_instance.id,
)
return Response(
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
status=201,

View File

@ -3,8 +3,12 @@ import logging
import os
import re
import sys
from datetime import datetime
from logging.handlers import SocketHandler
from typing import Any
from uuid import uuid4
from flask import g
from flask.app import Flask
# flask logging formats:
@ -23,6 +27,75 @@ class InvalidLogLevelError(Exception):
pass
class SpiffLogHandler(SocketHandler):
def __init__(self, app, *args): # type: ignore
super().__init__(
app.config["SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST"],
app.config["SPIFFWORKFLOW_BACKEND_EVENT_STREAM_PORT"],
)
self.app = app
def format(self, record: Any) -> str:
return json.dumps(
{
"version": "1.0",
"action": "add_event",
"event": {
"specversion": "1.0",
"type": record.name,
"id": str(uuid4()),
"source": "spiffworkflow.org",
"timestamp": datetime.utcnow().timestamp(),
"data": record._spiff_data,
},
}
)
def get_user_info(self) -> tuple[int | None, str | None]:
try:
return g.user.id, g.user.username
except Exception:
return None, None
def get_default_process_info(self) -> tuple[int | None, str | None]:
try:
tld = self.app.config["THREAD_LOCAL_DATA"]
return tld.process_instance_id, tld.process_model_identifier
except Exception:
return None, None
def filter(self, record: Any) -> bool:
if record.name.startswith("spiff") and getattr(record, "event_type", "") not in ["task_completed", "task_cancelled"]:
user_id, user_name = self.get_user_info()
data = {
"message": record.msg,
"userid": user_id,
"username": user_name,
}
process_instance_id, process_model_identifier = self.get_default_process_info()
if not hasattr(record, "process_instance_id"):
data["process_instance_id"] = process_instance_id
if not hasattr(record, "process_model_identifier"):
data["process_model_identifier"] = process_model_identifier
for attr in ["workflow_spec", "task_spec", "task_id", "task_type"]:
if hasattr(record, attr):
data[attr] = str(getattr(record, attr))
else:
data[attr] = None
record._spiff_data = data
return True
else:
return False
def makePickle(self, record: Any) -> bytes: # noqa: N802
# Instead of returning a pickled log record, write the json entry to the socket
return (self.format(record) + "\n").encode("utf-8")
# originally from https://stackoverflow.com/a/70223539/6090676
@ -100,7 +173,7 @@ def setup_logger_for_app(app: Flask, primary_logger: Any, force_run_with_celery:
log_level = logging.getLevelName(upper_log_level_string)
log_formatter = get_log_formatter(app)
app.logger.debug("Printing log to create app logger")
app.logger.debug(f"Printing log to create app logger with level: {upper_log_level_string}")
spiff_logger_filehandler = None
if app.config["SPIFFWORKFLOW_BACKEND_LOG_TO_FILE"]:
@ -180,6 +253,13 @@ def setup_logger_for_app(app: Flask, primary_logger: Any, force_run_with_celery:
level_number = logging.getLevelName(log_level_to_use)
logger_for_name.setLevel(level_number)
if app.config["SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST"] is not None:
spiff_logger = logging.getLogger("spiff")
spiff_logger.setLevel(logging.INFO)
spiff_logger.propagate = False
handler = SpiffLogHandler(app) # type: ignore
spiff_logger.addHandler(handler)
def get_log_formatter(app: Flask) -> logging.Formatter:
log_formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
@ -201,3 +281,28 @@ def get_log_formatter(app: Flask) -> logging.Formatter:
)
log_formatter = json_formatter
return log_formatter
class LoggingService:
_spiff_logger = logging.getLogger("spiff")
@classmethod
def log_event(
cls,
event_type: str,
task_guid: str | None = None,
process_model_identifier: str | None = None,
process_instance_id: int | None = None,
) -> None:
extra: dict[str, Any] = {"event_type": event_type}
if task_guid is not None:
extra["task_guid"] = task_guid
if process_model_identifier is not None:
extra["process_model_Identifier"] = process_model_identifier
if process_instance_id is not None:
extra["process_instance_id"] = process_instance_id
cls._spiff_logger.info(event_type, extra=extra)

View File

@ -95,6 +95,7 @@ from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.custom_parser import MyCustomParser
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.jinja_service import JinjaHelpers
from spiffworkflow_backend.services.logging_service import LoggingService
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -1174,6 +1175,9 @@ class ProcessInstanceProcessor:
self.process_instance_model.end_in_seconds = round(time.time())
if self._workflow_completed_handler is not None:
self._workflow_completed_handler(self.process_instance_model)
LoggingService.log_event(
ProcessInstanceEventType.process_instance_completed.value,
)
db.session.add(self.process_instance_model)

View File

@ -11,6 +11,7 @@ from spiffworkflow_backend.models.process_instance_event import ProcessInstanceE
from spiffworkflow_backend.models.process_instance_migration_detail import ProcessInstanceMigrationDetailDict
from spiffworkflow_backend.models.process_instance_migration_detail import ProcessInstanceMigrationDetailModel
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
from spiffworkflow_backend.services.logging_service import LoggingService
class ProcessInstanceTmpService:
@ -84,10 +85,13 @@ class ProcessInstanceTmpService:
if add_to_db_session:
db.session.add(process_instance_error_detail)
LoggingService.log_event(event_type, task_guid)
if migration_details is not None:
pi_detail = cls.add_process_instance_migration_detail(process_instance_event, migration_details)
if add_to_db_session:
db.session.add(pi_detail)
return (process_instance_event, process_instance_error_detail)
@classmethod

View File

@ -2,6 +2,7 @@
"data": {},
"correlations": {},
"last_task": "f67c1c06-454c-40d1-b26c-74172139e729",
"completed": false,
"success": true,
"tasks": {
"1d7372b6-c4f0-4e00-ada9-2c47380d8489": {

View File

@ -7,6 +7,7 @@
"correlations": {},
"last_task": "a8052b4d-65ed-4e55-8233-062113ebe18f",
"success": true,
"completed": false,
"tasks": {
"098e4fc2-a399-4325-b0a9-76d6c330fbf4": {
"id": "098e4fc2-a399-4325-b0a9-76d6c330fbf4",
@ -635,6 +636,7 @@
},
"correlations": {},
"last_task": "5eb9e777-cfbf-4ef9-8ba8-79fa5d172b7e",
"completed": false,
"success": true,
"tasks": {
"0315382d-fdf6-4c27-8d7d-63dddf0b05fb": {
@ -761,6 +763,7 @@
},
"correlations": {},
"last_task": "af12522c-811b-4258-a569-65890838677f",
"completed": false,
"success": true,
"tasks": {
"6e6ad5c3-e701-4b59-8a81-4ed2c63bd0e1": {