feat(workflows): add Vector log parsing workflow template and synchronization

Add workflow template for parsing logs collected by Vector from Kubernetes
pods, with semaphore synchronization to prevent concurrent access conflicts.

- log-parsing-workflow-template-vector: New workflow template that scales
  down Vector aggregator to access RWO PVC, parses JSONL logs, then scales
  aggregator back up
- vector-log-parsing-semaphore: ConfigMap semaphore limiting to one log
  parsing workflow at a time (prevents RWO PVC mount conflicts)
- codex-workflows-rbac: Added deployment get/patch/update permissions to
  executor role (required for scaling Vector aggregator)

Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
Chrysostomos Nanakos 2025-10-21 13:25:00 +03:00
parent 0c8e28fa46
commit 200c749cb5
No known key found for this signature in database
4 changed files with 165 additions and 3 deletions

View File

@ -45,7 +45,9 @@ spec:
value: "false"
- name: codexLogLevel
value: "INFO"
# value: "DEBUG;trace:swarm\\,blockexcnetworkpeer" # make sure to escape commas or helm will fail
# value: "DEBUG;trace:blockexcengine\\,blockexcnetworkpeer\\,blockexcnetwork\\,discoveryengine" # make sure to escape commas or helm will fail
- name: codexMemory
value: ""
###################################### Experiment Retries #############################################
# Allow the workflow to replay failed experiments from a previous run instead of running a new set.
@ -97,6 +99,9 @@ spec:
# disable this when running local experiments.
- name: parseLogs
value: "false"
# Which log parsing workflow template to use (one of: log-parsing-workflow, log-parsing-workflow-vector)
- name: logParsingWorkflow
value: "log-parsing-workflow-vector"
#######################################################################################################
@ -262,7 +267,7 @@ spec:
generateName: log-parsing-
spec:
workflowTemplateRef:
name: log-parsing-workflow
name: {{workflow.parameters.logParsingWorkflow}}
arguments:
parameters:
- name: experimentGroupId
@ -496,7 +501,8 @@ spec:
--set "experiment.codexLogLevel={{workflow.parameters.codexLogLevel}}"\
--set experiment.seederSets={{inputs.parameters.seederSets}}\
--set deployment.minikubeEnv={{workflow.parameters.minikubeEnv}}\
--set deployment.removeData={{workflow.parameters.removeData}}\
--set experiment.removeData={{workflow.parameters.removeData}}\
--set experiment.memory={{workflow.parameters.codexMemory}}\
--set deployment.nodeTag={{workflow.parameters.nodeTag}}\
--set deployment.runnerTag={{workflow.parameters.runnerTag}}\
--set deployment.region={{workflow.parameters.region}}

View File

@ -40,6 +40,9 @@ rules:
- apiGroups: [ "argoproj.io" ]
resources: [ "workflowtaskresults", "workflows" ]
verbs: [ "create", "patch", "get", "list" ]
- apiGroups: [ "apps" ]
resources: [ "deployments" ]
verbs: [ "get", "patch", "update" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding

View File

@ -0,0 +1,142 @@
# Workflow template for parsing logs for an experiment group using Vector.
#
# Collect logs from Kubernetes pods and writes them to a PVC as JSONL files.
# The workflow scales down the Vector aggregator to
# access the RWO PVC, parses the logs, then scales it back up.
#
# Uses synchronization to ensure only one workflow can parse logs at a time,
# preventing conflicts when multiple experiments finish simultaneously.
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: log-parsing-workflow-vector
spec:
serviceAccountName: codex-benchmarks-workflows
entrypoint: log-parsing-workflow
# Synchronization: Only one workflow can access vector-logs-pvc at a time
synchronization:
semaphore:
configMapKeyRef:
name: vector-log-parsing-semaphore
key: workflow
# Timeout for entire workflow ( 2 hours )
activeDeadlineSeconds: 7200
# Sadly we need a PVC to share data among steps. This is a limitation of Argo.
volumeClaimTemplates:
- metadata:
name: logs
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 50Gi
storageClassName: do-block-storage
arguments:
parameters:
- name: experimentGroupId
- name: bucket
value: "codex-benchmarks"
- name: vectorLogsPath
value: "/mnt/vector-logs"
volumes:
- name: vector-logs
persistentVolumeClaim:
claimName: vector-logs-pvc
templates:
- name: log-parsing-workflow
steps:
- - name: scale-down-vector
template: scale-down-vector
- - name: parse-logs
template: parse-logs
- - name: scale-up-vector
template: scale-up-vector
- - name: tar-and-upload
template: tar-and-upload
- name: parse-logs
script:
image: codexstorage/bittorrent-benchmarks:latest
command: ["/bin/bash"]
source: |
set -e
poetry run python -m benchmarks.cli logs source \
--output-dir "/var/logs/{{workflow.parameters.experimentGroupId}}" \
"{{workflow.parameters.experimentGroupId}}" \
vector \
{{workflow.parameters.vectorLogsPath}}/benchmarks-*.jsonl \
--chronological
volumeMounts:
- name: logs
mountPath: "/var/logs"
- name: vector-logs
mountPath: "{{workflow.parameters.vectorLogsPath}}"
readOnly: true
- name: tar-and-upload
script:
image: codexstorage/bittorrent-benchmarks-workflows:latest
command: ["/bin/bash"]
source: |
set -e
if [ -z "$(ls /var/logs/{{workflow.parameters.experimentGroupId}})" ]; then
echo "No logs found."
exit 1
fi
echo "Creating tarball."
tar -czvf \
"/var/logs/{{workflow.parameters.experimentGroupId}}.tar.gz" \
-C /var/logs \
"{{workflow.parameters.experimentGroupId}}"
echo "Configure s3 alias for endpoint ${AWS_ENDPOINT_URL}."
mc alias set s3 "${AWS_ENDPOINT_URL}" "${AWS_ACCESS_KEY_ID}" "${AWS_SECRET_ACCESS_KEY}"
echo "Copy logs."
mc cp "/var/logs/{{workflow.parameters.experimentGroupId}}.tar.gz"\
"s3/{{workflow.parameters.bucket}}/logs/{{workflow.parameters.experimentGroupId}}.tar.gz"
envFrom:
- secretRef:
name: s3-codex-benchmarks
volumeMounts:
- name: logs
mountPath: "/var/logs"
- name: scale-down-vector
resource:
action: patch
manifest: |
apiVersion: apps/v1
kind: Deployment
metadata:
name: vector-aggregator
namespace: argo
spec:
replicas: 0
- name: scale-up-vector
resource:
action: patch
manifest: |
apiVersion: apps/v1
kind: Deployment
metadata:
name: vector-aggregator
namespace: argo
spec:
replicas: 1

View File

@ -0,0 +1,11 @@
# Semaphore for Vector log parsing workflow synchronization
# Ensures only one log parsing workflow can run at a time to avoid conflicts
# with the RWO vector-logs-pvc and Vector aggregator scaling
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-log-parsing-semaphore
namespace: argo
data:
workflow: "1" # Only 1 workflow can hold this semaphore at a time