From 8456245714d817205cb215b3a03d3206741eeefe Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 19 Jun 2026 08:06:11 +0200 Subject: [PATCH] Structured Broker API: interface/impl split + nim-brokers integration Squash of the initial structured-API POC work (1d684bf8..c37dcf84): - WIP: structured Broker API interface + impl (Logos Delivery facade) - Restructured folders; interfaces under logos_delivery/api; MessagingClient and ReliableChannelManager reworked as BrokerInterface/BrokerImplement - API types elevated under logos_delivery/api/types.nim - onelogosdelivery target; wakunode2 / FFI lib build fixes - nim-brokers integration + version bumps; git_version into FFI lib version - compile fixes for tests --- .claude/skills/gitnexus/gitnexus-cli/SKILL.md | 83 ++++ .../gitnexus/gitnexus-debugging/SKILL.md | 89 ++++ .../gitnexus/gitnexus-exploring/SKILL.md | 78 ++++ .../skills/gitnexus/gitnexus-guide/SKILL.md | 64 +++ .../gitnexus-impact-analysis/SKILL.md | 97 +++++ .../gitnexus/gitnexus-refactoring/SKILL.md | 121 ++++++ AGENTS.md | 6 +- CLAUDE.md | 44 ++ Makefile | 8 +- examples/api_example/api_example.nim | 4 +- library/logos_delivery_api/node_api.nim | 6 +- logos_delivery.nim | 25 ++ logos_delivery.nimble | 49 ++- logos_delivery/api/kernel_interface.nim | 182 ++++++++ .../api/logos_delivery_interface.nim | 51 +++ .../api/messaging_client_interface.nim | 48 +++ .../reliable_channel_manager_interface.nim | 48 +++ logos_delivery/api/types.nim | 129 ++++++ logos_delivery/channels/events.nim | 26 +- logos_delivery/channels/reliable_channel.nim | 104 ++--- .../channels/reliable_channel_manager.nim | 176 ++++---- logos_delivery/channels/types.nim | 9 +- logos_delivery/logos_delivery.nim | 215 ++++++++++ .../recv_service/recv_service.nim | 7 +- .../send_service/send_service.nim | 20 +- logos_delivery/messaging/messaging_client.nim | 68 +-- logos_delivery/waku/api/api.nim | 4 + logos_delivery/waku/api/api_conf.nim | 7 +- logos_delivery/waku/api/types.nim | 75 +--- logos_delivery/waku/events/health_events.nim | 6 +- logos_delivery/waku/events/message_events.nim | 31 +- logos_delivery/waku/factory/waku.nim | 29 -- .../waku/factory/waku_state_info.nim | 17 +- .../health_monitor/node_health_monitor.nim | 10 +- .../waku/node/subscription_manager.nim | 2 +- .../waku/waku_core/message/message.nim | 24 +- .../waku/waku_core/topics/content_topic.nim | 5 +- logos_delivery/waku/waku_relay/protocol.nim | 2 +- nim_brokers_instructions.md | 395 ++++++++++++++++++ nimble.lock | 6 +- tests/api/test_api_health.nim | 24 +- tests/api/test_node_conf.nim | 16 +- tests/node/test_wakunode_health_monitor.nim | 14 +- tests/test_waku.nim | 6 +- tools/confutils/cli_args.nim | 74 ++-- 45 files changed, 2037 insertions(+), 467 deletions(-) create mode 100644 .claude/skills/gitnexus/gitnexus-cli/SKILL.md create mode 100644 .claude/skills/gitnexus/gitnexus-debugging/SKILL.md create mode 100644 .claude/skills/gitnexus/gitnexus-exploring/SKILL.md create mode 100644 .claude/skills/gitnexus/gitnexus-guide/SKILL.md create mode 100644 .claude/skills/gitnexus/gitnexus-impact-analysis/SKILL.md create mode 100644 .claude/skills/gitnexus/gitnexus-refactoring/SKILL.md create mode 100644 logos_delivery/api/kernel_interface.nim create mode 100644 logos_delivery/api/logos_delivery_interface.nim create mode 100644 logos_delivery/api/messaging_client_interface.nim create mode 100644 logos_delivery/api/reliable_channel_manager_interface.nim create mode 100644 logos_delivery/api/types.nim create mode 100644 logos_delivery/logos_delivery.nim create mode 100644 nim_brokers_instructions.md diff --git a/.claude/skills/gitnexus/gitnexus-cli/SKILL.md b/.claude/skills/gitnexus/gitnexus-cli/SKILL.md new file mode 100644 index 000000000..cd9a83be0 --- /dev/null +++ b/.claude/skills/gitnexus/gitnexus-cli/SKILL.md @@ -0,0 +1,83 @@ +--- +name: gitnexus-cli +description: "Use when the user needs to run GitNexus CLI commands like analyze/index a repo, check status, clean the index, generate a wiki, or list indexed repos. Examples: \"Index this repo\", \"Reanalyze the codebase\", \"Generate a wiki\"" +--- + +# GitNexus CLI Commands + +All commands work via `npx` — no global install required. + +## Commands + +### analyze — Build or refresh the index + +```bash +npx gitnexus analyze +``` + +Run from the project root. This parses all source files, builds the knowledge graph, writes it to `.gitnexus/`, and generates CLAUDE.md / AGENTS.md context files. + +| Flag | Effect | +| -------------- | ---------------------------------------------------------------- | +| `--force` | Force full re-index even if up to date | +| `--embeddings` | Enable embedding generation for semantic search (off by default) | +| `--drop-embeddings` | Drop existing embeddings on rebuild. By default, an `analyze` without `--embeddings` preserves them. | + +**When to run:** First time in a project, after major code changes, or when `gitnexus://repo/{name}/context` reports the index is stale. In Claude Code, a PostToolUse hook detects staleness after `git commit` and `git merge` and notifies the agent to run `analyze` — the hook does not run analyze itself, to avoid blocking the agent for up to 120s and risking KuzuDB corruption on timeout. + +### status — Check index freshness + +```bash +npx gitnexus status +``` + +Shows whether the current repo has a GitNexus index, when it was last updated, and symbol/relationship counts. Use this to check if re-indexing is needed. + +### clean — Delete the index + +```bash +npx gitnexus clean +``` + +Deletes the `.gitnexus/` directory and unregisters the repo from the global registry. Use before re-indexing if the index is corrupt or after removing GitNexus from a project. + +| Flag | Effect | +| --------- | ------------------------------------------------- | +| `--force` | Skip confirmation prompt | +| `--all` | Clean all indexed repos, not just the current one | + +### wiki — Generate documentation from the graph + +```bash +npx gitnexus wiki +``` + +Generates repository documentation from the knowledge graph using an LLM. Requires an API key (saved to `~/.gitnexus/config.json` on first use). + +| Flag | Effect | +| ------------------- | ----------------------------------------- | +| `--force` | Force full regeneration | +| `--model ` | LLM model (default: minimax/minimax-m2.5) | +| `--base-url ` | LLM API base URL | +| `--api-key ` | LLM API key | +| `--concurrency ` | Parallel LLM calls (default: 3) | +| `--gist` | Publish wiki as a public GitHub Gist | + +### list — Show all indexed repos + +```bash +npx gitnexus list +``` + +Lists all repositories registered in `~/.gitnexus/registry.json`. The MCP `list_repos` tool provides the same information. + +## After Indexing + +1. **Read `gitnexus://repo/{name}/context`** to verify the index loaded +2. Use the other GitNexus skills (`exploring`, `debugging`, `impact-analysis`, `refactoring`) for your task + +## Troubleshooting + +- **"Not inside a git repository"**: Run from a directory inside a git repo +- **Index is stale after re-analyzing**: Restart Claude Code to reload the MCP server +- **Embeddings slow**: Omit `--embeddings` (it's off by default) or set `OPENAI_API_KEY` for faster API-based embedding diff --git a/.claude/skills/gitnexus/gitnexus-debugging/SKILL.md b/.claude/skills/gitnexus/gitnexus-debugging/SKILL.md new file mode 100644 index 000000000..9510b97ac --- /dev/null +++ b/.claude/skills/gitnexus/gitnexus-debugging/SKILL.md @@ -0,0 +1,89 @@ +--- +name: gitnexus-debugging +description: "Use when the user is debugging a bug, tracing an error, or asking why something fails. Examples: \"Why is X failing?\", \"Where does this error come from?\", \"Trace this bug\"" +--- + +# Debugging with GitNexus + +## When to Use + +- "Why is this function failing?" +- "Trace where this error comes from" +- "Who calls this method?" +- "This endpoint returns 500" +- Investigating bugs, errors, or unexpected behavior + +## Workflow + +``` +1. gitnexus_query({query: ""}) → Find related execution flows +2. gitnexus_context({name: ""}) → See callers/callees/processes +3. READ gitnexus://repo/{name}/process/{name} → Trace execution flow +4. gitnexus_cypher({query: "MATCH path..."}) → Custom traces if needed +``` + +> If "Index is stale" → run `npx gitnexus analyze` in terminal. + +## Checklist + +``` +- [ ] Understand the symptom (error message, unexpected behavior) +- [ ] gitnexus_query for error text or related code +- [ ] Identify the suspect function from returned processes +- [ ] gitnexus_context to see callers and callees +- [ ] Trace execution flow via process resource if applicable +- [ ] gitnexus_cypher for custom call chain traces if needed +- [ ] Read source files to confirm root cause +``` + +## Debugging Patterns + +| Symptom | GitNexus Approach | +| -------------------- | ---------------------------------------------------------- | +| Error message | `gitnexus_query` for error text → `context` on throw sites | +| Wrong return value | `context` on the function → trace callees for data flow | +| Intermittent failure | `context` → look for external calls, async deps | +| Performance issue | `context` → find symbols with many callers (hot paths) | +| Recent regression | `detect_changes` to see what your changes affect | + +## Tools + +**gitnexus_query** — find code related to error: + +``` +gitnexus_query({query: "payment validation error"}) +→ Processes: CheckoutFlow, ErrorHandling +→ Symbols: validatePayment, handlePaymentError, PaymentException +``` + +**gitnexus_context** — full context for a suspect: + +``` +gitnexus_context({name: "validatePayment"}) +→ Incoming calls: processCheckout, webhookHandler +→ Outgoing calls: verifyCard, fetchRates (external API!) +→ Processes: CheckoutFlow (step 3/7) +``` + +**gitnexus_cypher** — custom call chain traces: + +```cypher +MATCH path = (a)-[:CodeRelation {type: 'CALLS'}*1..2]->(b:Function {name: "validatePayment"}) +RETURN [n IN nodes(path) | n.name] AS chain +``` + +## Example: "Payment endpoint returns 500 intermittently" + +``` +1. gitnexus_query({query: "payment error handling"}) + → Processes: CheckoutFlow, ErrorHandling + → Symbols: validatePayment, handlePaymentError + +2. gitnexus_context({name: "validatePayment"}) + → Outgoing calls: verifyCard, fetchRates (external API!) + +3. READ gitnexus://repo/my-app/process/CheckoutFlow + → Step 3: validatePayment → calls fetchRates (external) + +4. Root cause: fetchRates calls external API without proper timeout +``` diff --git a/.claude/skills/gitnexus/gitnexus-exploring/SKILL.md b/.claude/skills/gitnexus/gitnexus-exploring/SKILL.md new file mode 100644 index 000000000..927a4e4b6 --- /dev/null +++ b/.claude/skills/gitnexus/gitnexus-exploring/SKILL.md @@ -0,0 +1,78 @@ +--- +name: gitnexus-exploring +description: "Use when the user asks how code works, wants to understand architecture, trace execution flows, or explore unfamiliar parts of the codebase. Examples: \"How does X work?\", \"What calls this function?\", \"Show me the auth flow\"" +--- + +# Exploring Codebases with GitNexus + +## When to Use + +- "How does authentication work?" +- "What's the project structure?" +- "Show me the main components" +- "Where is the database logic?" +- Understanding code you haven't seen before + +## Workflow + +``` +1. READ gitnexus://repos → Discover indexed repos +2. READ gitnexus://repo/{name}/context → Codebase overview, check staleness +3. gitnexus_query({query: ""}) → Find related execution flows +4. gitnexus_context({name: ""}) → Deep dive on specific symbol +5. READ gitnexus://repo/{name}/process/{name} → Trace full execution flow +``` + +> If step 2 says "Index is stale" → run `npx gitnexus analyze` in terminal. + +## Checklist + +``` +- [ ] READ gitnexus://repo/{name}/context +- [ ] gitnexus_query for the concept you want to understand +- [ ] Review returned processes (execution flows) +- [ ] gitnexus_context on key symbols for callers/callees +- [ ] READ process resource for full execution traces +- [ ] Read source files for implementation details +``` + +## Resources + +| Resource | What you get | +| --------------------------------------- | ------------------------------------------------------- | +| `gitnexus://repo/{name}/context` | Stats, staleness warning (~150 tokens) | +| `gitnexus://repo/{name}/clusters` | All functional areas with cohesion scores (~300 tokens) | +| `gitnexus://repo/{name}/cluster/{name}` | Area members with file paths (~500 tokens) | +| `gitnexus://repo/{name}/process/{name}` | Step-by-step execution trace (~200 tokens) | + +## Tools + +**gitnexus_query** — find execution flows related to a concept: + +``` +gitnexus_query({query: "payment processing"}) +→ Processes: CheckoutFlow, RefundFlow, WebhookHandler +→ Symbols grouped by flow with file locations +``` + +**gitnexus_context** — 360-degree view of a symbol: + +``` +gitnexus_context({name: "validateUser"}) +→ Incoming calls: loginHandler, apiMiddleware +→ Outgoing calls: checkToken, getUserById +→ Processes: LoginFlow (step 2/5), TokenRefresh (step 1/3) +``` + +## Example: "How does payment processing work?" + +``` +1. READ gitnexus://repo/my-app/context → 918 symbols, 45 processes +2. gitnexus_query({query: "payment processing"}) + → CheckoutFlow: processPayment → validateCard → chargeStripe + → RefundFlow: initiateRefund → calculateRefund → processRefund +3. gitnexus_context({name: "processPayment"}) + → Incoming: checkoutHandler, webhookHandler + → Outgoing: validateCard, chargeStripe, saveTransaction +4. Read src/payments/processor.ts for implementation details +``` diff --git a/.claude/skills/gitnexus/gitnexus-guide/SKILL.md b/.claude/skills/gitnexus/gitnexus-guide/SKILL.md new file mode 100644 index 000000000..937ac73d1 --- /dev/null +++ b/.claude/skills/gitnexus/gitnexus-guide/SKILL.md @@ -0,0 +1,64 @@ +--- +name: gitnexus-guide +description: "Use when the user asks about GitNexus itself — available tools, how to query the knowledge graph, MCP resources, graph schema, or workflow reference. Examples: \"What GitNexus tools are available?\", \"How do I use GitNexus?\"" +--- + +# GitNexus Guide + +Quick reference for all GitNexus MCP tools, resources, and the knowledge graph schema. + +## Always Start Here + +For any task involving code understanding, debugging, impact analysis, or refactoring: + +1. **Read `gitnexus://repo/{name}/context`** — codebase overview + check index freshness +2. **Match your task to a skill below** and **read that skill file** +3. **Follow the skill's workflow and checklist** + +> If step 1 warns the index is stale, run `npx gitnexus analyze` in the terminal first. + +## Skills + +| Task | Skill to read | +| -------------------------------------------- | ------------------- | +| Understand architecture / "How does X work?" | `gitnexus-exploring` | +| Blast radius / "What breaks if I change X?" | `gitnexus-impact-analysis` | +| Trace bugs / "Why is X failing?" | `gitnexus-debugging` | +| Rename / extract / split / refactor | `gitnexus-refactoring` | +| Tools, resources, schema reference | `gitnexus-guide` (this file) | +| Index, status, clean, wiki CLI commands | `gitnexus-cli` | + +## Tools Reference + +| Tool | What it gives you | +| ---------------- | ------------------------------------------------------------------------ | +| `query` | Process-grouped code intelligence — execution flows related to a concept | +| `context` | 360-degree symbol view — categorized refs, processes it participates in | +| `impact` | Symbol blast radius — what breaks at depth 1/2/3 with confidence | +| `detect_changes` | Git-diff impact — what do your current changes affect | +| `rename` | Multi-file coordinated rename with confidence-tagged edits | +| `cypher` | Raw graph queries (read `gitnexus://repo/{name}/schema` first) | +| `list_repos` | Discover indexed repos | + +## Resources Reference + +Lightweight reads (~100-500 tokens) for navigation: + +| Resource | Content | +| ---------------------------------------------- | ----------------------------------------- | +| `gitnexus://repo/{name}/context` | Stats, staleness check | +| `gitnexus://repo/{name}/clusters` | All functional areas with cohesion scores | +| `gitnexus://repo/{name}/cluster/{clusterName}` | Area members | +| `gitnexus://repo/{name}/processes` | All execution flows | +| `gitnexus://repo/{name}/process/{processName}` | Step-by-step trace | +| `gitnexus://repo/{name}/schema` | Graph schema for Cypher | + +## Graph Schema + +**Nodes:** File, Function, Class, Interface, Method, Community, Process +**Edges (via CodeRelation.type):** CALLS, IMPORTS, EXTENDS, IMPLEMENTS, DEFINES, MEMBER_OF, STEP_IN_PROCESS + +```cypher +MATCH (caller)-[:CodeRelation {type: 'CALLS'}]->(f:Function {name: "myFunc"}) +RETURN caller.name, caller.filePath +``` diff --git a/.claude/skills/gitnexus/gitnexus-impact-analysis/SKILL.md b/.claude/skills/gitnexus/gitnexus-impact-analysis/SKILL.md new file mode 100644 index 000000000..e19af280c --- /dev/null +++ b/.claude/skills/gitnexus/gitnexus-impact-analysis/SKILL.md @@ -0,0 +1,97 @@ +--- +name: gitnexus-impact-analysis +description: "Use when the user wants to know what will break if they change something, or needs safety analysis before editing code. Examples: \"Is it safe to change X?\", \"What depends on this?\", \"What will break?\"" +--- + +# Impact Analysis with GitNexus + +## When to Use + +- "Is it safe to change this function?" +- "What will break if I modify X?" +- "Show me the blast radius" +- "Who uses this code?" +- Before making non-trivial code changes +- Before committing — to understand what your changes affect + +## Workflow + +``` +1. gitnexus_impact({target: "X", direction: "upstream"}) → What depends on this +2. READ gitnexus://repo/{name}/processes → Check affected execution flows +3. gitnexus_detect_changes() → Map current git changes to affected flows +4. Assess risk and report to user +``` + +> If "Index is stale" → run `npx gitnexus analyze` in terminal. + +## Checklist + +``` +- [ ] gitnexus_impact({target, direction: "upstream"}) to find dependents +- [ ] Review d=1 items first (these WILL BREAK) +- [ ] Check high-confidence (>0.8) dependencies +- [ ] READ processes to check affected execution flows +- [ ] gitnexus_detect_changes() for pre-commit check +- [ ] Assess risk level and report to user +``` + +## Understanding Output + +| Depth | Risk Level | Meaning | +| ----- | ---------------- | ------------------------ | +| d=1 | **WILL BREAK** | Direct callers/importers | +| d=2 | LIKELY AFFECTED | Indirect dependencies | +| d=3 | MAY NEED TESTING | Transitive effects | + +## Risk Assessment + +| Affected | Risk | +| ------------------------------ | -------- | +| <5 symbols, few processes | LOW | +| 5-15 symbols, 2-5 processes | MEDIUM | +| >15 symbols or many processes | HIGH | +| Critical path (auth, payments) | CRITICAL | + +## Tools + +**gitnexus_impact** — the primary tool for symbol blast radius: + +``` +gitnexus_impact({ + target: "validateUser", + direction: "upstream", + minConfidence: 0.8, + maxDepth: 3 +}) + +→ d=1 (WILL BREAK): + - loginHandler (src/auth/login.ts:42) [CALLS, 100%] + - apiMiddleware (src/api/middleware.ts:15) [CALLS, 100%] + +→ d=2 (LIKELY AFFECTED): + - authRouter (src/routes/auth.ts:22) [CALLS, 95%] +``` + +**gitnexus_detect_changes** — git-diff based impact analysis: + +``` +gitnexus_detect_changes({scope: "staged"}) + +→ Changed: 5 symbols in 3 files +→ Affected: LoginFlow, TokenRefresh, APIMiddlewarePipeline +→ Risk: MEDIUM +``` + +## Example: "What breaks if I change validateUser?" + +``` +1. gitnexus_impact({target: "validateUser", direction: "upstream"}) + → d=1: loginHandler, apiMiddleware (WILL BREAK) + → d=2: authRouter, sessionManager (LIKELY AFFECTED) + +2. READ gitnexus://repo/my-app/processes + → LoginFlow and TokenRefresh touch validateUser + +3. Risk: 2 direct callers, 2 processes = MEDIUM +``` diff --git a/.claude/skills/gitnexus/gitnexus-refactoring/SKILL.md b/.claude/skills/gitnexus/gitnexus-refactoring/SKILL.md new file mode 100644 index 000000000..f48cc01bd --- /dev/null +++ b/.claude/skills/gitnexus/gitnexus-refactoring/SKILL.md @@ -0,0 +1,121 @@ +--- +name: gitnexus-refactoring +description: "Use when the user wants to rename, extract, split, move, or restructure code safely. Examples: \"Rename this function\", \"Extract this into a module\", \"Refactor this class\", \"Move this to a separate file\"" +--- + +# Refactoring with GitNexus + +## When to Use + +- "Rename this function safely" +- "Extract this into a module" +- "Split this service" +- "Move this to a new file" +- Any task involving renaming, extracting, splitting, or restructuring code + +## Workflow + +``` +1. gitnexus_impact({target: "X", direction: "upstream"}) → Map all dependents +2. gitnexus_query({query: "X"}) → Find execution flows involving X +3. gitnexus_context({name: "X"}) → See all incoming/outgoing refs +4. Plan update order: interfaces → implementations → callers → tests +``` + +> If "Index is stale" → run `npx gitnexus analyze` in terminal. + +## Checklists + +### Rename Symbol + +``` +- [ ] gitnexus_rename({symbol_name: "oldName", new_name: "newName", dry_run: true}) — preview all edits +- [ ] Review graph edits (high confidence) and ast_search edits (review carefully) +- [ ] If satisfied: gitnexus_rename({..., dry_run: false}) — apply edits +- [ ] gitnexus_detect_changes() — verify only expected files changed +- [ ] Run tests for affected processes +``` + +### Extract Module + +``` +- [ ] gitnexus_context({name: target}) — see all incoming/outgoing refs +- [ ] gitnexus_impact({target, direction: "upstream"}) — find all external callers +- [ ] Define new module interface +- [ ] Extract code, update imports +- [ ] gitnexus_detect_changes() — verify affected scope +- [ ] Run tests for affected processes +``` + +### Split Function/Service + +``` +- [ ] gitnexus_context({name: target}) — understand all callees +- [ ] Group callees by responsibility +- [ ] gitnexus_impact({target, direction: "upstream"}) — map callers to update +- [ ] Create new functions/services +- [ ] Update callers +- [ ] gitnexus_detect_changes() — verify affected scope +- [ ] Run tests for affected processes +``` + +## Tools + +**gitnexus_rename** — automated multi-file rename: + +``` +gitnexus_rename({symbol_name: "validateUser", new_name: "authenticateUser", dry_run: true}) +→ 12 edits across 8 files +→ 10 graph edits (high confidence), 2 ast_search edits (review) +→ Changes: [{file_path, edits: [{line, old_text, new_text, confidence}]}] +``` + +**gitnexus_impact** — map all dependents first: + +``` +gitnexus_impact({target: "validateUser", direction: "upstream"}) +→ d=1: loginHandler, apiMiddleware, testUtils +→ Affected Processes: LoginFlow, TokenRefresh +``` + +**gitnexus_detect_changes** — verify your changes after refactoring: + +``` +gitnexus_detect_changes({scope: "all"}) +→ Changed: 8 files, 12 symbols +→ Affected processes: LoginFlow, TokenRefresh +→ Risk: MEDIUM +``` + +**gitnexus_cypher** — custom reference queries: + +```cypher +MATCH (caller)-[:CodeRelation {type: 'CALLS'}]->(f:Function {name: "validateUser"}) +RETURN caller.name, caller.filePath ORDER BY caller.filePath +``` + +## Risk Rules + +| Risk Factor | Mitigation | +| ------------------- | ----------------------------------------- | +| Many callers (>5) | Use gitnexus_rename for automated updates | +| Cross-area refs | Use detect_changes after to verify scope | +| String/dynamic refs | gitnexus_query to find them | +| External/public API | Version and deprecate properly | + +## Example: Rename `validateUser` to `authenticateUser` + +``` +1. gitnexus_rename({symbol_name: "validateUser", new_name: "authenticateUser", dry_run: true}) + → 12 edits: 10 graph (safe), 2 ast_search (review) + → Files: validator.ts, login.ts, middleware.ts, config.json... + +2. Review ast_search edits (config.json: dynamic reference!) + +3. gitnexus_rename({symbol_name: "validateUser", new_name: "authenticateUser", dry_run: false}) + → Applied 12 edits across 8 files + +4. gitnexus_detect_changes({scope: "all"}) + → Affected: LoginFlow, TokenRefresh + → Risk: MEDIUM — run tests for these flows +``` diff --git a/AGENTS.md b/AGENTS.md index 6deaa4055..e824d756b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -513,10 +513,14 @@ Language: Nim 2.x | License: MIT or Apache 2.0 Note: For specific version requirements, check `logos_delivery.nimble`. +## Working with nim-brokers + +@nim_brokers_instructions.md + # GitNexus — Code Intelligence -This project is indexed by GitNexus as **logos-delivery** (2076 symbols, 2564 relationships, 12 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. +This project is indexed by GitNexus as **logos-delivery** (13906 symbols, 23819 relationships, 294 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. > If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. diff --git a/CLAUDE.md b/CLAUDE.md index 43c994c2d..c0a276a47 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1 +1,45 @@ @AGENTS.md + + +# GitNexus — Code Intelligence + +This project is indexed by GitNexus as **logos-delivery** (13906 symbols, 23819 relationships, 294 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. + +> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. + +## Always Do + +- **MUST run impact analysis before editing any symbol.** Before modifying a function, class, or method, run `gitnexus_impact({target: "symbolName", direction: "upstream"})` and report the blast radius (direct callers, affected processes, risk level) to the user. +- **MUST run `gitnexus_detect_changes()` before committing** to verify your changes only affect expected symbols and execution flows. +- **MUST warn the user** if impact analysis returns HIGH or CRITICAL risk before proceeding with edits. +- When exploring unfamiliar code, use `gitnexus_query({query: "concept"})` to find execution flows instead of grepping. It returns process-grouped results ranked by relevance. +- When you need full context on a specific symbol — callers, callees, which execution flows it participates in — use `gitnexus_context({name: "symbolName"})`. + +## Never Do + +- NEVER edit a function, class, or method without first running `gitnexus_impact` on it. +- NEVER ignore HIGH or CRITICAL risk warnings from impact analysis. +- NEVER rename symbols with find-and-replace — use `gitnexus_rename` which understands the call graph. +- NEVER commit changes without running `gitnexus_detect_changes()` to check affected scope. + +## Resources + +| Resource | Use for | +|----------|---------| +| `gitnexus://repo/logos-delivery/context` | Codebase overview, check index freshness | +| `gitnexus://repo/logos-delivery/clusters` | All functional areas | +| `gitnexus://repo/logos-delivery/processes` | All execution flows | +| `gitnexus://repo/logos-delivery/process/{name}` | Step-by-step execution trace | + +## CLI + +| Task | Read this skill file | +|------|---------------------| +| Understand architecture / "How does X work?" | `.claude/skills/gitnexus/gitnexus-exploring/SKILL.md` | +| Blast radius / "What breaks if I change X?" | `.claude/skills/gitnexus/gitnexus-impact-analysis/SKILL.md` | +| Trace bugs / "Why is X failing?" | `.claude/skills/gitnexus/gitnexus-debugging/SKILL.md` | +| Rename / extract / split / refactor | `.claude/skills/gitnexus/gitnexus-refactoring/SKILL.md` | +| Tools, resources, schema reference | `.claude/skills/gitnexus/gitnexus-guide/SKILL.md` | +| Index, status, clean, wiki CLI commands | `.claude/skills/gitnexus/gitnexus-cli/SKILL.md` | + + diff --git a/Makefile b/Makefile index daa8a8ad7..30e39a739 100644 --- a/Makefile +++ b/Makefile @@ -416,7 +416,7 @@ docker-liteprotocoltester-push: ################ ## C Bindings ## ################ -.PHONY: cbindings cwaku_example liblogosdelivery liblogosdelivery_example +.PHONY: cbindings cwaku_example libwaku liblogosdelivery liblogosdelivery_example onelogosdelivery detected_OS ?= Linux ifeq ($(OS),Windows_NT) @@ -450,6 +450,12 @@ else $(NIMBLE) --verbose liblogosdelivery$(BUILD_COMMAND) logos_delivery.nimble endif +# New single-root Logos Delivery library (BrokerFfiApi + all wrappers), built from +# ./logos_delivery.nim. Single cross-platform task (no $(BUILD_COMMAND) suffix). +# Set SRCGEN=1 to also dump the broker-generated sources (-d:brokerDebug). +onelogosdelivery: | build-deps librln + $(NIMBLE) --verbose onelogosdelivery logos_delivery.nimble + logosdelivery_example: | build liblogosdelivery @echo -e $(BUILD_MSG) "build/$@" ifeq ($(detected_OS),Darwin) diff --git a/examples/api_example/api_example.nim b/examples/api_example/api_example.nim index 9e96858db..624c9b081 100644 --- a/examples/api_example/api_example.nim +++ b/examples/api_example/api_example.nim @@ -67,11 +67,11 @@ when isMainModule: if args.ethRpcEndpoint == "": # Create a basic configuration for the Waku node # No RLN as we don't have an ETH RPC Endpoint - conf.mode = Core + conf.mode = some(WakuMode.Core) conf.preset = "logos.dev" else: # Connect to TWN, use ETH RPC Endpoint for RLN - conf.mode = Core + conf.mode = some(WakuMode.Core) conf.preset = "twn" conf.ethClientUrls = @[EthRpcUrl(args.ethRpcEndpoint)] diff --git a/library/logos_delivery_api/node_api.nim b/library/logos_delivery_api/node_api.nim index 58785e80d..33ecabd75 100644 --- a/library/logos_delivery_api/node_api.nim +++ b/library/logos_delivery_api/node_api.nim @@ -115,9 +115,9 @@ proc logosdelivery_start_node( chronicles.error "MessageReceivedEvent.listen failed", err = $error return err("MessageReceivedEvent.listen failed: " & $error) - let ConnectionStatusChangeListener = EventConnectionStatusChange.listen( + let ConnectionStatusChangeListener = ConnectionStatusChangeEvent.listen( ctx.myLib[].brokerCtx, - proc(event: EventConnectionStatusChange) {.async: (raises: []).} = + proc(event: ConnectionStatusChangeEvent) {.async: (raises: []).} = callEventCallback(ctx, "onConnectionStatusChange"): $newJsonEvent("connection_status_change", event), ).valueOr: @@ -150,7 +150,7 @@ proc logosdelivery_stop_node( await MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx) await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx) await MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx) - await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx) + await ConnectionStatusChangeEvent.dropAllListeners(ctx.myLib[].brokerCtx) (await ctx.myLib[].stop()).isOkOr: let errMsg = $error diff --git a/logos_delivery.nim b/logos_delivery.nim index c4edbe6d6..5ed465e43 100644 --- a/logos_delivery.nim +++ b/logos_delivery.nim @@ -8,3 +8,28 @@ export api import logos_delivery/waku/factory/waku export waku + +import logos_delivery/api/logos_delivery_interface +export logos_delivery_interface + +import logos_delivery/logos_delivery + +import brokers/api_library # registerBrokerLibrary + +# `git_version` is exported as a `{.strdefine.}` by several modules in the graph +# (waku.nim, waku_node.nim, nim-ffi), so it's ambiguous unqualified. Pin to +# waku's and expose an unambiguous local const for registerBrokerLibrary; the +# build injects `-d:git_version="$(git describe …)"`. +const ldGitVersion = waku.git_version + +registerBrokerLibrary: + name: + "logosdelivery" + version: + ldGitVersion + mainClass: + LogosDeliveryInterface + initializeRequest: + StartAsClient + shutdownRequest: + Shutdown diff --git a/logos_delivery.nimble b/logos_delivery.nimble index efe118595..feb937c6b 100644 --- a/logos_delivery.nimble +++ b/logos_delivery.nimble @@ -1,6 +1,10 @@ #!fmt: off import os +import std/strutils + # strip() is used in buildLibDynamicMac/buildLibStaticMac (above the original + # `import std/strutils` lower in this file); needed at the top when this nimble + # is evaluated as the package script while compiling ./logos_delivery.nim. mode = ScriptMode.Verbose ### Package @@ -65,7 +69,7 @@ requires "https://github.com/logos-messaging/nim-ffi#v0.1.3" requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5" -requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1" +requires "https://github.com/NagyZoltanPeter/nim-brokers.git#cf5ee65cc20211068d7191de7e5e177c0dc212fa" requires "https://github.com/vacp2p/nim-lsquic.git#v0.5.1" requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2" @@ -504,6 +508,49 @@ task liblogosdeliveryStaticLinux, "Generate bindings": task liblogosdeliveryStaticMac, "Generate bindings": buildLibStaticMac("liblogosdelivery", "library") +## New single-root Logos Delivery library (BrokerFfiApi). +## Builds ./logos_delivery.nim (registerBrokerLibrary name "logosdelivery") as a +## shared library with the full set of FFI wrappers (C + C++ headers always, plus +## Python, Rust and Go). Set the SRCGEN env var to also dump the broker-generated +## sources (-d:brokerDebug). Independent of the original liblogosdelivery targets. +task onelogosdelivery, + "Build the new single-root liblogosdelivery as a BrokerFfiApi library with all FFI wrappers (C/C++/Python/Rust/Go)": + let outdir = "build/onelogosdelivery" + if not dirExists(outdir): + mkDir(outdir) + + # SRCGEN env var present -> persist the generated *.gen.nim under build/broker_debug/. + let srcGenFlag = + if existsEnv("SRCGEN"): " -d:brokerDebug" else: "" + + let libExt = + when defined(windows): ".dll" + elif defined(macosx): ".dylib" + else: ".so" + + # Full --out path keeps the shared lib in outdir (a bare basename lands in cwd). + exec "nim c" & + " -d:BrokerFfiApi" & + " -d:BrokerFfiApiGenPy -d:BrokerFfiApiGenRust -d:BrokerFfiApiGenGo" & + " --threads:on --app:lib --opt:speed --mm:refc" & + " -d:metrics -d:discv5_protocol_id=d5waku" & + " --nimMainPrefix:logosdelivery" & " --out:" & outdir & "/liblogosdelivery" & libExt & + srcGenFlag & " " & getMyCPU() & getNimParams() & " ./logos_delivery.nim" + + # The generated FFI wrappers land next to the source (repo root): brokers v3.1.2's + # `-d:BrokerFfiApiOutDir` is referenced but never declared `{.strdefine.}`, so it + # cannot redirect them. Move them into outdir as a post-build step to keep the + # repo root clean. + for f in [ + "logosdelivery.h", "logosdelivery.hpp", "logosdelivery.py", "logosdelivery.cddl", + "logosdeliveryConfig.cmake", "logosdeliveryConfigVersion.cmake", + ]: + if fileExists(f): + mvFile(f, outdir & "/" & f) + for d in ["logosdelivery_rs", "logosdelivery_go"]: + if dirExists(d): + mvDir(d, outdir & "/" & d) + ### Formatting tasks task nphchanges, "Run nph on .nim/.nims/.nimble files changed on this branch/PR": diff --git a/logos_delivery/api/kernel_interface.nim b/logos_delivery/api/kernel_interface.nim new file mode 100644 index 000000000..f83ee8843 --- /dev/null +++ b/logos_delivery/api/kernel_interface.nim @@ -0,0 +1,182 @@ +## KernelInterface — the libwaku operation set, exposed with the real Nim typed +## signatures behind each entry point (NOT the flat string/JSON FFI contract). +## +## WakuMessage / PubsubTopic / ContentTopic / StoreQueryRequest+Response cross +## the broker natively. libp2p identity / ENR values cross as their canonical +## string forms (they are ref/opaque and cannot be auto-serialised). Inbound +## relay/filter delivery is the `ReceivedMessage` event (replaces libwaku's +## set_event_callback for messages). +## +## Broker constraints (v3.1.0): every request returns `Result[T, string]` — the +## error channel is always `string`; the payload `T` must NOT be `void` (the mt +## codec cannot encode `void`), so "no value" requests return `Result[bool, string]`. + +import std/options +import results, chronos +import brokers/broker_interface + +import logos_delivery/api/types + +import + logos_delivery/waku/waku_core, # PubsubTopic + logos_delivery/waku/waku_store/common # StoreQueryRequest, StoreQueryResponse + +export types + +BrokerInterface(API, KernelInterface): + EventBroker: + type ReceivedMessage = object + ## Inbound relay/filter message delivery (replaces set_event_callback). + pubsubTopic*: PubsubTopic + message*: WakuMessage + + # --- topic construction --- + RequestBroker: + proc buildContentTopic( + appName: string, appVersion: uint32, name: string, encoding: string + ): Future[Result[ContentTopic, string]] {.async.} + + RequestBroker: + proc buildPubsubTopic( + topicName: string + ): Future[Result[PubsubTopic, string]] {.async.} + + RequestBroker: + proc defaultPubsubTopic(): Future[Result[PubsubTopic, string]] {.async.} + + # --- relay --- + RequestBroker: + proc relayPublish( + pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 + ): Future[Result[int, string]] {.async.} + + RequestBroker: + proc relaySubscribe( + pubsubTopic: PubsubTopic + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc relayUnsubscribe( + pubsubTopic: PubsubTopic + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc relayAddProtectedShard( + clusterId: uint16, shardId: uint16, publicKey: string + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc relayConnectedPeers( + pubsubTopic: PubsubTopic + ): Future[Result[seq[string], string]] {.async.} + + RequestBroker: + proc relayPeersInMesh( + pubsubTopic: PubsubTopic + ): Future[Result[seq[string], string]] {.async.} + + # --- filter --- + RequestBroker: + proc filterSubscribe( + pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic], peer: string + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc filterUnsubscribe( + pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic], peer: string + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc filterUnsubscribeAll(peer: string): Future[Result[bool, string]] {.async.} + + # --- lightpush --- + RequestBroker: + proc lightpushPublish( + pubsubTopic: PubsubTopic, message: WakuMessage, peer: string + ): Future[Result[string, string]] {.async.} + + # --- store --- + RequestBroker: + proc storeQuery( + request: StoreQueryRequest, peer: string, timeoutMs: int + ): Future[Result[StoreQueryResponse, string]] {.async.} + + # --- peer management --- + RequestBroker: + proc connect( + peers: seq[string], timeoutMs: uint32 + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc disconnectPeerById(peerId: string): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc disconnectAllPeers(): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc dialPeer( + peerAddr: string, protocol: string, timeoutMs: int + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc dialPeerById( + peerId: string, protocol: string, timeoutMs: int + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc peerIdsFromPeerstore(): Future[Result[seq[string], string]] {.async.} + + RequestBroker: + proc connectedPeersInfo(): Future[Result[seq[string], string]] {.async.} + + RequestBroker: + proc connectedPeers(): Future[Result[seq[string], string]] {.async.} + + RequestBroker: + proc peerIdsByProtocol( + protocol: string + ): Future[Result[seq[string], string]] {.async.} + + # --- discovery --- + RequestBroker: + proc dnsDiscovery( + enrTreeUrl: string, nameServer: string, timeoutMs: int + ): Future[Result[seq[string], string]] {.async.} + + RequestBroker: + proc discv5UpdateBootnodes( + bootnodes: seq[string] + ): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc startDiscv5(): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc stopDiscv5(): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc peerExchangeRequest(numPeers: uint64): Future[Result[int, string]] {.async.} + + # --- debug / info --- + RequestBroker: + proc version(): Future[Result[string, string]] {.async.} + + RequestBroker: + proc listenAddresses(): Future[Result[seq[string], string]] {.async.} + + RequestBroker: + proc myEnr(): Future[Result[string, string]] {.async.} + + RequestBroker: + proc myPeerId(): Future[Result[string, string]] {.async.} + + RequestBroker: + proc metrics(): Future[Result[string, string]] {.async.} + + RequestBroker: + proc isOnline(): Future[Result[bool, string]] {.async.} + + RequestBroker: + proc pingPeer( + peerAddr: string, timeoutMs: int + ): Future[Result[int64, string]] {.async.} diff --git a/logos_delivery/api/logos_delivery_interface.nim b/logos_delivery/api/logos_delivery_interface.nim new file mode 100644 index 000000000..49b13adb2 --- /dev/null +++ b/logos_delivery/api/logos_delivery_interface.nim @@ -0,0 +1,51 @@ +## LogosDeliveryInterface — the facade / mainClass interface. +## +## Owns the node and the three sub-interfaces; getters return them. This module +## imports AND re-exports the three sub-interface contracts so a consumer can +## `import ./LogosDeliveryInterface` and get the whole interface surface. + +import results, chronos +import brokers/broker_interface + +# Module aliases avoid the module-name vs type-name clash (file KernelInterface.nim +# exports type KernelInterface); the type names stay in scope unqualified for the getters. +import ./kernel_interface as ikernel_iface +import ./messaging_client_interface as imessagingclient_iface +import ./reliable_channel_manager_interface as ireliablechannelmanager_iface + +export ikernel_iface, imessagingclient_iface, ireliablechannelmanager_iface + +BrokerInterface(API, LogosDeliveryInterface): + EventBroker: + type ConnectionStatusChangeEvent* = object + connectionStatus*: ConnectionStatus + + RequestBroker: + proc startAsNode(config: string): Future[Result[void, string]] {.async.} + + RequestBroker: + proc startAsClient( + mode: WakuMode, preset: string + ): Future[Result[MessagingClientInterface, string]] {.async.} + + RequestBroker: + proc shutdown(): Future[Result[void, string]] {.async.} + + RequestBroker: + proc stop(): Future[Result[void, string]] {.async.} + + RequestBroker: + # Getters: return the owned sub-interface instances. + proc kernel(): Future[Result[KernelInterface, string]] {.async.} + + RequestBroker: + proc messaging(): Future[Result[MessagingClientInterface, string]] {.async.} + + RequestBroker: + proc channels(): Future[Result[ReliableChannelManagerInterface, string]] {.async.} + + RequestBroker: + proc getNodeInfo(id: NodeInfoId): Future[Result[string, string]] {.async.} + + RequestBroker: + proc getAvailableConfigs(): Future[Result[string, string]] {.async.} diff --git a/logos_delivery/api/messaging_client_interface.nim b/logos_delivery/api/messaging_client_interface.nim new file mode 100644 index 000000000..b49bd3193 --- /dev/null +++ b/logos_delivery/api/messaging_client_interface.nim @@ -0,0 +1,48 @@ +## MessagingClientInterface — the messaging API (waku/api) minus createNode. +## +## Node creation lives in the facade (LogosDeliveryInterface); this interface exposes +## subscribe / unsubscribe / send only. + +import results, chronos +import brokers/broker_interface + +import logos_delivery/api/types +export types + +BrokerInterface(API, MessagingClientInterface): + EventBroker: + # Event emitted when a message is sent to the network + type MessageSentEvent* = object + requestId*: RequestId + messageHash*: string + + EventBroker: + # Event emitted when a message send operation fails + type MessageErrorEvent* = object + requestId*: RequestId + messageHash*: string + error*: string + + EventBroker: + # Confirmation that a message has been correctly delivered to some neighbouring nodes. + type MessagePropagatedEvent* = object + requestId*: RequestId + messageHash*: string + + EventBroker: + # Event emitted when a message is received via Waku + type MessageReceivedEvent* = object + messageHash*: string + message*: WakuMessage + + RequestBroker: + proc subscribe(contentTopic: ContentTopic): Future[Result[void, string]] {.async.} + + RequestBroker: + proc unsubscribe(contentTopic: ContentTopic): Future[Result[void, string]] {.async.} + + RequestBroker: + # Returns the RequestId in its string form. Named `sendMessage` (not `send`) + # because broker request verbs must be globally unique across all interfaces + # in the library, and ReliableChannelManagerInterface also has a send. + proc send(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} diff --git a/logos_delivery/api/reliable_channel_manager_interface.nim b/logos_delivery/api/reliable_channel_manager_interface.nim new file mode 100644 index 000000000..556b24738 --- /dev/null +++ b/logos_delivery/api/reliable_channel_manager_interface.nim @@ -0,0 +1,48 @@ +## ReliableChannelManagerInterface — create / close / send on a reliable channel, +## plus a MessageReceived event (bridged from the channel layer's own +## ChannelMessageReceivedEvent by the impl). + +import results, chronos +import brokers/broker_interface + +import logos_delivery/api/types +export types + +BrokerInterface(API, ReliableChannelManagerInterface): + EventBroker: + type ChannelMessageReceivedEvent* = object + channelId*: ChannelId + senderId*: SdsParticipantID + payload*: seq[byte] + + EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + + EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string + + RequestBroker: + # Returns the channel id of the created channel. + proc createReliableChannel( + channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID + ): Future[Result[ChannelId, string]] {.async.} + + RequestBroker: + proc closeChannel(channelId: ChannelId): Future[Result[void, string]] {.async.} + + RequestBroker: + # Returns the RequestId in its string form. Named `sendOnChannel` (not `send`) + # for the global-verb-uniqueness reason noted on MessagingClientInterface.sendMessage. + proc sendOnChannel( + channelId: ChannelId, payload: seq[byte], ephemeral: bool + ): Future[Result[RequestId, string]] {.async.} diff --git a/logos_delivery/api/types.nim b/logos_delivery/api/types.nim new file mode 100644 index 000000000..addd44233 --- /dev/null +++ b/logos_delivery/api/types.nim @@ -0,0 +1,129 @@ +## logos_delivery/api/types — shared API type definitions. +## +## These derived (non-builtin) types are referenced by the BrokerInterface +## contracts (LogosDeliveryInterface / MessagingClientInterface / +## ReliableChannelManagerInterface) and were elevated here from their original +## modules so the API surface has a single types home. Each original module now +## imports this module and re-exports the moved entity, so existing call sites +## are unaffected. +## +## NOTE (accepted layering inversion): `WakuMessage` and `ContentTopic` were +## physically moved out of `waku_core/`, so `waku_core/message` now depends on +## this module (which pulls in nim-sds via the channel id types). + +{.push raises: [].} + +import bearssl/rand, std/times, chronos +import stew/byteutils +import std/hashes + +import + logos_delivery/waku/utils/requests as request_utils + # generateRequestId (used by RequestId.new); lost when the procs were + # consolidated here during the type-elevation refactor. + +import + logos_delivery/waku/waku_core/time + # Timestamp: TODO: this needs to be elevated into interface level. + +import types/sds_message_id # nim-sds: SdsChannelID, SdsParticipantID (leaf) + +# SdsParticipantID (and SdsChannelID, needed by ChannelId) are NOT moved — they +# belong to nim-sds; they are imported and explicitly re-exported here. +export sds_message_id + +type + ContentTopic* = string + + RequestId* = distinct string + + ConnectionStatus* {.pure.} = enum + Disconnected + PartiallyConnected + Connected + + WakuMode* {.pure.} = enum + Core # full service node + Edge # client-only node + + NodeInfoId* {.pure.} = enum + Version + Metrics + MyMultiaddresses + MyENR + MyPeerId + MyBoundPorts + MyMixPubKey + + ChannelId* = SdsChannelID + + MessageEnvelope* = object + contentTopic*: ContentTopic + payload*: seq[byte] + ephemeral*: bool + meta*: seq[byte] + ## Opaque wire-format marker carried on the underlying WakuMessage. + ## Higher layers (e.g. Reliable Channel) stamp this so peers can route + ## ingress traffic to their corresponding layer. Empty by default. + + WakuMessage* = object # Data payload transmitted. + payload*: seq[byte] + contentTopic*: ContentTopic ## content-based filtering identifier + meta*: seq[byte] ## application specific metadata + version*: uint32 ## payload-encryption discriminator (Whisper/WakuV1 compat) + timestamp*: Timestamp ## sender generated timestamp + ephemeral*: bool ## transient (not-to-be-stored) marker + proof*: seq[byte] ## RFC 17 spam-protection proof (rln-relay) + +## ===== helpers ===== +## +proc new*(T: typedesc[RequestId], rng: ref HmacDrbgContext): T = + ## Generate a new RequestId using the provided RNG. + RequestId(request_utils.generateRequestId(rng)) + +proc `$`*(r: RequestId): string {.inline.} = + string(r) + +proc `==`*(a, b: RequestId): bool {.inline.} = + string(a) == string(b) + +proc hash*(r: RequestId): Hash = + ## Allows `RequestId` to be used as a `Table` key. + hash(string(r)) + +proc generateRequestId*(rng: ref HmacDrbgContext): RequestId = + var bytes: array[10, byte] + hmacDrbgGenerate(rng[], bytes) + return RequestId(byteutils.toHex(bytes)) + +proc init*( + T: type MessageEnvelope, + contentTopic: ContentTopic, + payload: seq[byte] | string, + ephemeral: bool = false, + meta: seq[byte] = @[], +): MessageEnvelope = + when payload is seq[byte]: + MessageEnvelope( + contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta + ) + else: + MessageEnvelope( + contentTopic: contentTopic, + payload: payload.toBytes(), + ephemeral: ephemeral, + meta: meta, + ) + +proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = + ## Convert a MessageEnvelope to a WakuMessage. + var wm = WakuMessage( + contentTopic: envelope.contentTopic, + payload: envelope.payload, + ephemeral: envelope.ephemeral, + meta: envelope.meta, + timestamp: getNowInNanosecondTime(), + ) + return wm + +{.pop.} diff --git a/logos_delivery/channels/events.nim b/logos_delivery/channels/events.nim index 5f69095e4..36108f18e 100644 --- a/logos_delivery/channels/events.nim +++ b/logos_delivery/channels/events.nim @@ -13,27 +13,7 @@ import logos_delivery/waku/events/message_events as waku_message_events import brokers/event_broker import ./types as channel_types +import logos_delivery/api/reliable_channel_manager_interface -export waku_message_events, channel_types, event_broker - -EventBroker: - type ChannelMessageReceivedEvent* = object - channelId*: ChannelId - senderId*: SdsParticipantID - payload*: seq[byte] - -EventBroker: - ## Emitted when every segment of a channel-level `send()` reached - ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the - ## `requestId` is the channel-layer parent returned by `send()`. - type ChannelMessageSentEvent* = object - channelId*: ChannelId - requestId*: RequestId - -EventBroker: - ## Emitted when a channel-level `send()` finalises with at least one - ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. - type ChannelMessageErrorEvent* = object - channelId*: ChannelId - requestId*: RequestId - error*: string +export + waku_message_events, channel_types, event_broker, reliable_channel_manager_interface diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index 5a7ab24d4..e1a26440e 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -21,8 +21,8 @@ import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto +import logos_delivery/api/messaging_client_interface as mci import logos_delivery/waku/api/types -import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/waku_core/topics import ./events @@ -31,9 +31,7 @@ import ./scalable_data_sync/scalable_data_sync import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption -export - types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager, - encryption +export types, events, segmentation, scalable_data_sync, rate_limit_manager, encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## Wire-format spec marker for the Reliable Channel layer, as defined @@ -44,14 +42,6 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## on breaking on-the-wire changes; implementations pin one version. type - SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. - async: (raises: [CatchableError]), gcsafe - .} - ## Egress dispatch boundary. Typically wraps `MessagingClient.send`; - ## tests inject a fake that records calls and returns canned - ## `RequestId`s so the send state machine can be exercised end-to-end - ## without a network. - MessagePersistence {.pure.} = enum Persistent Ephemeral @@ -90,7 +80,7 @@ type ## Spec-defined public type. Fields are private so callers cannot ## mutate internals and break invariants. Getters are added below ## for the few values consumers may need. - sendHandler: SendHandler + messagingClient: MessagingClientInterface channelId: ChannelId contentTopic: ContentTopic senderId: SdsParticipantID @@ -129,13 +119,15 @@ proc stop*(self: ReliableChannel) {.async: (raises: []).} = ## Stops the SDS background loops. Persisted SDS state survives. await self.sdsHandler.stop() -proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) = +proc tryFinalizeChannelReq( + self: ReliableChannel, channelReqId: RequestId +) {.async: (raises: []).} = ## Tries to finalize the channel-level request identified by `channelReqId` if ## certain conditions are met, i.e., no segments are still awaiting dispatch or in flight, ## and the total number of confirmed + failed segments equals the total expected segments. ## Therefore, the channel-level request is removed from `self.channelReqs` ## and the appropriate final event is emitted. - ## + ## let state = self.channelReqs.getOrDefault(channelReqId) if state.totalExpectedSegments == 0: ## Either already finalized (and removed) or never inserted. @@ -148,18 +140,15 @@ proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) = self.channelReqs.del(channelReqId) if state.failedCount > 0: - ChannelMessageErrorEvent.emit( + await ChannelMessageErrorEvent.emit( self.brokerCtx, - ChannelMessageErrorEvent( - channelId: self.channelId, - requestId: channelReqId, - error: "one or more segments failed", - ), + channelId = self.channelId, + requestId = channelReqId, + error = "one or more segments failed", ) else: - ChannelMessageSentEvent.emit( - self.brokerCtx, - ChannelMessageSentEvent(channelId: self.channelId, requestId: channelReqId), + await ChannelMessageSentEvent.emit( + self.brokerCtx, channelId = self.channelId, requestId = channelReqId ) type ClaimedSegment = object @@ -184,7 +173,7 @@ type MessagingOutcome {.pure.} = enum proc onMessageFinal( self: ReliableChannel, messagingReqId: RequestId, outcome: MessagingOutcome -) = +) {.async.} = for channelReqId, state in self.channelReqs.mpairs: let idx = state.inflightMessagingIds.find(messagingReqId) if idx < 0: @@ -195,17 +184,19 @@ proc onMessageFinal( state.confirmedCount.inc() of MessagingOutcome.Failed: state.failedCount.inc() - self.tryFinalizeChannelReq(channelReqId) + await self.tryFinalizeChannelReq(channelReqId) return -proc markSegmentFailed(self: ReliableChannel, channelReqId: RequestId) = +proc markSegmentFailed( + self: ReliableChannel, channelReqId: RequestId +) {.async: (raises: []).} = try: self.channelReqs[channelReqId].failedCount.inc() except KeyError as e: error "unreachable: channelReqId not found in markSegmentFailed", channelReqId = $channelReqId, error = e.msg return - self.tryFinalizeChannelReq(channelReqId) + await self.tryFinalizeChannelReq(channelReqId) proc markSegmentInflight( self: ReliableChannel, channelReqId: RequestId, messagingReqId: RequestId @@ -248,14 +239,15 @@ proc onReadyToSend( ## clear and encrypt only the application payload. let encRes = await Encrypt.request(m) let encrypted = encRes.valueOr: - MessageErrorEvent.emit( + ### TODO: Emitting of events from another layer is not completly ok to do so. + await mci.MessageErrorEvent.emit( self.brokerCtx, - MessageErrorEvent( - requestId: channelReqId, messageHash: "", error: "encryption failed: " & error - ), + requestId = channelReqId, + messageHash = "", + error = "encryption failed: " & error, ) ## Encryption failed *before* we could hand the segment to the - self.markSegmentFailed(channelReqId) + await self.markSegmentFailed(channelReqId) continue let wireBytes = seq[byte](encrypted) @@ -269,25 +261,24 @@ proc onReadyToSend( meta: LipWireReliableChannelVersion.toBytes(), ) - ## `sendHandler` is not annotated `(raises: [])`, but this listener is. + ## `messagingClient.send` is not annotated `(raises: [])`, but this listener is. ## Convert any raise to a Result error so the state machine handles ## both failure modes (Result.err and exception) through one path. let sendRes = try: - await self.sendHandler(envelope) + await self.messagingClient.send(envelope) except CatchableError as e: Result[RequestId, string].err("messaging send raised: " & e.msg) let messagingReqId = sendRes.valueOr: - MessageErrorEvent.emit( + ### TODO: Emitting of events from another layer is not completly ok to do so. + await mci.MessageErrorEvent.emit( self.brokerCtx, - MessageErrorEvent( - requestId: channelReqId, - messageHash: "", - error: "messaging send failed: " & error, - ), + requestId = channelReqId, + messageHash = "", + error = "messaging send failed: " & error, ) - self.markSegmentFailed(channelReqId) + await self.markSegmentFailed(channelReqId) continue self.markSegmentInflight(channelReqId, messagingReqId) @@ -390,13 +381,12 @@ proc onMessageReceived( ## the `Decrypt` request broker. let decRes = await Decrypt.request(payload) let plaintext = decRes.valueOr: - MessageErrorEvent.emit( + ### TODO: Emitting of events from another layer is not completly ok to do so. + await mci.MessageErrorEvent.emit( self.brokerCtx, - MessageErrorEvent( - requestId: RequestId(""), - messageHash: messageHash, - error: "decryption failed: " & error, - ), + requestId = RequestId(""), + messageHash = messageHash, + error = "decryption failed: " & error, ) return let plaintextBytes = seq[byte](plaintext) @@ -407,13 +397,11 @@ proc onMessageReceived( ## marker/contentTopic filter already ran, so surface it as an error event ## rather than dropping it silently. let deliverable = (await self.sdsHandler.handleIncoming(plaintextBytes)).valueOr: - MessageErrorEvent.emit( + await mci.MessageErrorEvent.emit( self.brokerCtx, - MessageErrorEvent( - requestId: RequestId(""), - messageHash: messageHash, - error: "SDS handleIncoming failed: " & error, - ), + requestId = RequestId(""), + messageHash = messageHash, + error = "SDS handleIncoming failed: " & error, ) return for content in deliverable: @@ -421,7 +409,7 @@ proc onMessageReceived( proc new*( T: type ReliableChannel, - sendHandler: SendHandler, + messagingClient: MessagingClientInterface, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, @@ -441,7 +429,7 @@ proc new*( ## typically constructs it as a closure over `MessagingClient.send`. Tests ## pass a fake to drive the send state machine without touching the network. let chn = T( - sendHandler: sendHandler, + messagingClient: messagingClient, channelId: channelId, contentTopic: contentTopic, senderId: senderId, @@ -495,9 +483,9 @@ proc new*( chn.onMessageFinal(evt.requestId, MessagingOutcome.Sent), ) - discard MessageErrorEvent.listen( + discard mci.MessageErrorEvent.listen( chn.brokerCtx, - proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} = + proc(evt: mci.MessageErrorEvent): Future[void] {.async: (raises: []).} = chn.onMessageFinal(evt.requestId, MessagingOutcome.Failed), ) diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 0717eb3f8..d73bff981 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -11,10 +11,11 @@ import chronos import chronicles import stew/byteutils -import brokers/broker_context +import brokers/broker_implement +import logos_delivery/api/messaging_client_interface +import logos_delivery/api/reliable_channel_manager_interface import logos_delivery/waku/events/message_events as waku_message_events -import logos_delivery/messaging/messaging_client import logos_delivery/waku/waku_core/topics import logos_delivery/waku/persistency/sds_persistency @@ -27,33 +28,13 @@ const SdsJobId = "sds" ## One persistency job shared by every channel's SDS state; rows are ## keyed by channelId. -type ReliableChannelManager* = ref object +type ReliableChannelManager* = ref object of ReliableChannelManagerInterface channels: Table[ChannelId, ReliableChannel] - messagingClient: MessagingClient ## Borrowed from the owning `Waku`. - sendHandler: SendHandler + messagingClient: MessagingClientInterface + ## Borrowed from the owning `Waku`. ## Default egress dispatch for channels created through this manager. ## Constructed at mount time as a closure over `MessagingClient.send` ## so the channel layer itself stays callable-only. - brokerCtx: BrokerContext - -proc new*( - T: type ReliableChannelManager, - messagingClient: MessagingClient, - sendHandler: SendHandler, - brokerCtx: BrokerContext = globalBrokerContext(), -): Result[T, string] = - if messagingClient.isNil(): - return err("messaging client is required") - if sendHandler.isNil(): - return err("sendHandler is required") - return ok( - T( - channels: initTable[ChannelId, ReliableChannel](), - messagingClient: messagingClient, - sendHandler: sendHandler, - brokerCtx: brokerCtx, - ) - ) proc start*(self: ReliableChannelManager): Result[void, string] = ## Placeholder: per-channel listeners are installed in `ReliableChannel.new`, @@ -80,83 +61,86 @@ proc sdsPersistence(): Option[Persistence] = return none(Persistence) return some(newSdsPersistence(job)) -proc createReliableChannel*( - self: ReliableChannelManager, - channelId: ChannelId, - contentTopic: ContentTopic, - senderId: SdsParticipantID, - sendHandler: SendHandler = nil, -): Result[ChannelId, string] = - ## Spec entry point. The `sendHandler` and `rng` the channel needs are - ## sourced from the owning `ReliableChannelManager` rather than passed - ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` - ## request brokers — the application installs its own providers - ## (or `setNoopEncryption()`) before traffic flows. - ## - ## Segmentation, SDS and rate-limit configs will eventually be read - ## from the node's `NodeConfig`. Defaults for now. - ## - ## `sendHandler` defaults to the manager's default (constructed at mount - ## from `MessagingClient.send`); tests pass a fake to bypass the network. - if self.channels.hasKey(channelId): - return err("channel already exists: " & channelId) +BrokerImplement ReliableChannelManager of ReliableChannelManagerInterface: + proc new( + T: typedesc[ReliableChannelManager], messagingClient: MessagingClientInterface + ): T = + T( + channels: initTable[ChannelId, ReliableChannel](), + messagingClient: messagingClient, + ) - let segConfig = SegmentationConfig( - segmentSizeBytes: DefaultSegmentSizeBytes, - enableReedSolomon: false, - persistence: nil, - ) - let sdsConfig = SdsConfig( - acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, - maxRetransmissions: DefaultMaxRetransmissions, - causalHistorySize: DefaultCausalHistorySize, - persistence: sdsPersistence(), - ) - let rateConfig = RateLimitConfig( - epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch - ) + method createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + ): Future[Result[ChannelId, string]] {.async.} = + ## Spec entry point. The`rng` the channel needs are + ## sourced from the owning `ReliableChannelManager` rather than passed + ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` + ## request brokers — the application installs its own providers + ## (or `setNoopEncryption()`) before traffic flows. + ## + ## Segmentation, SDS and rate-limit configs will eventually be read + ## from the node's `NodeConfig`. Defaults for now. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) - let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: sdsPersistence(), + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) - let chn = ReliableChannel.new( - sendHandler = effectiveSendHandler, - channelId = channelId, - contentTopic = contentTopic, - senderId = senderId, - segConfig = segConfig, - sdsConfig = sdsConfig, - rateConfig = rateConfig, - brokerCtx = self.brokerCtx, - ) + let chn = ReliableChannel.new( + messagingClient = self.messagingClient, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + ) - self.channels[channelId] = chn - return ok(channelId) + self.channels[channelId] = chn + return ok(channelId) -proc closeChannel*( - self: ReliableChannelManager, channelId: ChannelId -): Future[Result[void, string]] {.async: (raises: []).} = - ## Stops the channel's SDS loops and releases the channel. Persisted SDS - ## state survives, so re-creating the channel restores it. - let chn = self.channels.getOrDefault(channelId) - if chn.isNil(): - return err("unknown channel: " & channelId) - self.channels.del(channelId) - await chn.stop() - return ok() + method closeChannel*( + self: ReliableChannelManager, channelId: ChannelId + ): Future[Result[void, string]] {.async.} = + ## Stops the channel's SDS loops and releases the channel. Persisted SDS + ## state survives, so re-creating the channel restores it. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + await chn.stop() + return ok() -proc send*( - self: ReliableChannelManager, - channelId: ChannelId, - appPayload: seq[byte], - ephemeral: bool = false, -): Future[Result[RequestId, string]] {.async: (raises: []).} = - ## Spec-level entry point. Looks the channel up by id and delegates - ## to `ReliableChannel.send`, which exposes the visible pipeline - ## segmentation -> sds -> rate_limit_manager -> encryption. - let chn = self.channels.getOrDefault(channelId) - if chn.isNil(): - return err("unknown channel: " & channelId) - return await chn.send(appPayload, ephemeral) + method sendOnChannel( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, + ): Future[Result[RequestId, string]] {.async.} = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return chn.send(appPayload, ephemeral) ## Inbound messages are not handed to the manager by direct call. Each ## `ReliableChannel` installs its own `MessageReceivedEvent` listener diff --git a/logos_delivery/channels/types.nim b/logos_delivery/channels/types.nim index ec663cf7b..b3a8e8362 100644 --- a/logos_delivery/channels/types.nim +++ b/logos_delivery/channels/types.nim @@ -1,15 +1,8 @@ ## Core identifier types for the Reliable Channel API. -import std/hashes -import logos_delivery/waku/api/types as api_types +import logos_delivery/api/types as api_types import ./scalable_data_sync/scalable_data_sync export scalable_data_sync export api_types - -type ChannelId* = SdsChannelID - -proc hash*(r: RequestId): Hash = - ## Allows `RequestId` to be used as a `Table` key. - hash(string(r)) diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim new file mode 100644 index 000000000..60dfabced --- /dev/null +++ b/logos_delivery/logos_delivery.nim @@ -0,0 +1,215 @@ +## LogosDelivery — the LogosDeliveryInterface facade implementation. +## +## Owns a `Waku` (node + messagingClient + reliableChannelManager) and exposes +## the three sub-interfaces through cached getters. Mirrors the persistence +## example's `PersistenceImpl` (nim-brokers/examples/persistence/nimlib): a +## `ref object of ` with `BrokerImplement`, sub-instances built on a +## `newInstanceCtx(self.brokerCtx)`, plus a `provideFactory`. +## +## Scope (decided): `initializeRequest(configPath)` creates + mounts the node but +## does NOT start it. `kernel()` is a stub until a KernelImpl exists. + +import results, chronos +import std/options # some()/Option for WakuNodeConf.mode +import std/json # newJArray/newJObject/%*/pretty in getAvailableConfigs + +import brokers/broker_context, brokers/broker_interface, brokers/broker_implement +import logos_delivery/api/logos_delivery_interface as logosdelivery_iface +import logos_delivery/api/types as api_types +import logos_delivery/api/kernel_interface as kernel_iface + +import + logos_delivery/waku/factory/waku, + logos_delivery/waku/factory/waku_state_info, + logos_delivery/messaging/messaging_client, + logos_delivery/channels/reliable_channel_manager + # Waku, getNodeInfoItem, mountMessagingClient, mountReliableChannelManager, stop +import tools/confutils/[cli_args, config_option_meta] # WakuNodeConf (+ .load) + +type LogosDelivery* = ref object of LogosDeliveryInterface + kernelI: KernelInterface ## lazily-built sub-interface caches (keep them reachable) + waku: Waku ## the owned node facade (built in initializeRequest) + messagingClient: MessagingClient + reliableChannelManager: ReliableChannelManager + +proc loadConf(configPath: string): Result[WakuNodeConf, string] = + ## Delegates to cli_args' concrete loader so confutils' `load` macro expands in + ## cli_args' full scope (avoids leaking undeclared identifiers / WakuMode clash here). + loadWakuNodeConfFromFile(configPath) + +proc createNode(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} = + let wakuConf = conf.toWakuConf().valueOr: + return err("Failed to handle the configuration: " & error) + + ## We are not defining app callbacks at node creation + let wakuRes = (await Waku.new(wakuConf)).valueOr: + error "waku initialization failed", error = error + return err("Failed setting up Waku: " & $error) + + return ok(wakuRes) + +proc initMessagingClient(self: LogosDelivery): Result[MessagingClient, string] = + let subCtx = newInstanceCtx(self.brokerCtx) + return ok( + MessagingClient.createUnderContext( + subCtx, self.waku.conf.p2pReliability, self.waku.node + ) + ) + +proc initReliableChannelManager( + self: LogosDelivery +): Result[ReliableChannelManager, string] = + let subCtx = newInstanceCtx(self.brokerCtx) + return ok(ReliableChannelManager.createUnderContext(subCtx, self.messagingClient)) + +BrokerImplement LogosDelivery of LogosDeliveryInterface: + method kernel( + self: LogosDelivery + ): Future[Result[KernelInterface, string]] {.async.} = + if self.waku.isNil(): + return err("not initialized; call startAsClient first") + + return err("kernel not yet implemented") + + method messaging( + self: LogosDelivery + ): Future[Result[MessagingClientInterface, string]] {.async.} = + if self.waku.isNil(): + return err("not initialized; call startAsClient first") + if self.messagingClient.isNil(): + return err("messaging client not mounted") + return ok(MessagingClientInterface(self.messagingClient)) + + method channels( + self: LogosDelivery + ): Future[Result[ReliableChannelManagerInterface, string]] {.async.} = + if self.waku.isNil(): + return err("not initialized; call startAsClient first") + if self.messagingClient.isNil(): + return err("not initialized; call startAsClient first") + + if self.reliableChannelManager.isNil(): + self.reliableChannelManager = initReliableChannelManager(self).valueOr: + return err("failed to initialize ReliableChannelManager: " & error) + self.reliableChannelManager.start().isOkOr: + return err("failed to start ReliableChannelManager: " & error) + + return ok(ReliableChannelManagerInterface(self.reliableChannelManager)) + + method startAsNode( + self: LogosDelivery, config: string + ): Future[Result[void, string]] {.async.} = + if not self.waku.isNil(): + return err("already initialized") + let conf = loadConf(config).valueOr: + return err("failed to load config: " & error) + # The restamp forces {.async: (raises: []).}, but createNode can raise. + try: + self.waku = (await createNode(conf)).valueOr: + return err("failed to create node: " & error) + + (await self.waku.start()).isOkOr: + return err("failed to start node: " & $error) + + return ok() + except CatchableError as e: + return err("initialize failed: " & e.msg) + + method startAsClient( + self: LogosDelivery, mode: api_types.WakuMode, preset: string + ): Future[Result[MessagingClientInterface, string]] {.async.} = + if not self.messagingClient.isNil(): + return err("already initialized") + if not self.waku.isNil(): + return err( + "already started as node; cannot start as client, but you can use as client" + ) + + try: + var conf: WakuNodeConf = ?defaultWakuNodeConf() + conf.mode = some(mode) + conf.preset = preset + + self.waku = (await createNode(conf)).valueOr: + return err("failed to create node: " & error) + + self.messagingClient = initMessagingClient(self).valueOr: + return err("failed to mount messaging client: " & error) + + (await self.waku.start()).isOkOr: + return err("failed to start node: " & $error) + self.messagingClient.start().isOkOr: + return err("failed to start messaging client: " & $error) + + return ok(MessagingClientInterface(self.messagingClient)) + except CatchableError as e: + return err("initialize failed: " & e.msg) + + method stop(self: LogosDelivery): Future[Result[void, string]] {.async.} = + if not self.reliableChannelManager.isNil(): + try: + await self.reliableChannelManager.stop() + self.reliableChannelManager.close() + self.reliableChannelManager = nil + except CatchableError as e: + return err("ReliableChannelManager stop failed: " & e.msg) + + if not self.messagingClient.isNil(): + try: + await self.messagingClient.stop() + self.messagingClient.close() + self.messagingClient = nil + except CatchableError as e: + return err("MessagingClient stop failed: " & e.msg) + + if not self.waku.isNil(): + try: + (await self.waku.stop()).isOkOr: + return err("Node stop failed: " & $error) + # `Waku` is a plain ref object (not a BrokerImplement) — `stop()` is its + # only teardown; there is no `close()`. + self.waku = nil + except CatchableError as e: + return err("Node stop failed: " & e.msg) + + return ok() + + method shutdown(self: LogosDelivery): Future[Result[void, string]] {.async.} = + return await self.stop() + + method getNodeInfo( + self: LogosDelivery, id: NodeInfoId + ): Future[Result[string, string]] {.async.} = + return ok(self.waku.stateInfo.getNodeInfoItem(id)) + + method getAvailableConfigs( + self: LogosDelivery + ): Future[Result[string, string]] {.async.} = + let optionMetas: seq[ConfigOptionMeta] = extractConfigOptionMeta(WakuNodeConf) + var configOptionDetails = newJArray() + + for meta in optionMetas: + configOptionDetails.add( + %*{ + meta.fieldName: meta.typeName & "(" & meta.defaultValue & ")", + "desc": meta.desc, + } + ) + + var jsonNode = newJObject() + jsonNode["configOptions"] = configOptionDetails + let asString = pretty(jsonNode) + return ok(pretty(jsonNode)) + +# FFI factory registration +proc setupProviders(ctx: BrokerContext): Result[void, string] = + ## Called by registerBrokerLibrary on the processing thread: construct the + ## main facade impl adopting the FFI context, wiring its providers under `ctx`. + discard LogosDelivery.createUnderContext(ctx) + ok() + +# DI factory registration: non ffi - nim lib users needs it. +LogosDeliveryInterface.provideFactory( + proc(): Result[LogosDeliveryInterface, string] {.gcsafe.} = + ok(LogosDeliveryInterface(LogosDelivery.createUnderContext(NewBrokerContext()))) +) diff --git a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim index 90bdb0839..002bc1c9c 100644 --- a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim +++ b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim @@ -6,6 +6,7 @@ import logos_delivery/waku/compat/option_valueor import std/[tables, sequtils, options, sets] import chronos, chronicles, libp2p/utility import brokers/broker_context +import logos_delivery/api/messaging_client_interface import logos_delivery/waku/[ waku_core, @@ -78,7 +79,7 @@ proc getMissingMsgsFromStore( proc processIncomingMessage( self: RecvService, pubsubTopic: string, message: WakuMessage -): bool = +): Future[bool] {.async.} = ## Return false if the incoming message is from a non-subscribed topic, ## or if the message is a duplicate (recently-seen). Otherwise, save it as ## recently-seen, emit a MessageReceivedEvent, and return true. @@ -100,7 +101,7 @@ proc processIncomingMessage( let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) self.recentReceivedMsgs.add(rxMsg) - MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) + await MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) return true proc checkStore*(self: RecvService) {.async.} = @@ -143,7 +144,7 @@ proc checkStore*(self: RecvService) {.async.} = let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) if missingMsgsRet.isOk(): for msgTuple in missingMsgsRet.get(): - if self.processIncomingMessage(msgTuple.pubsubTopic, msgTuple.msg): + if await self.processIncomingMessage(msgTuple.pubsubTopic, msgTuple.msg): info "recv service store-recovered message", msg_hash = shortLog(msgTuple.hash), pubsubTopic = msgTuple.pubsubTopic else: diff --git a/logos_delivery/messaging/delivery_service/send_service/send_service.nim b/logos_delivery/messaging/delivery_service/send_service/send_service.nim index 92026be5d..7e12c4d2b 100644 --- a/logos_delivery/messaging/delivery_service/send_service/send_service.nim +++ b/logos_delivery/messaging/delivery_service/send_service/send_service.nim @@ -5,6 +5,7 @@ import logos_delivery/waku/compat/option_valueor import std/[sequtils, tables, options, typetraits] import chronos, chronicles, libp2p/utility import brokers/broker_context +import logos_delivery/api/messaging_client_interface import ./[send_processor, relay_processor, lightpush_processor, delivery_task], logos_delivery/waku/[ @@ -170,14 +171,14 @@ proc checkStoredMessages(self: SendService) {.async.} = await self.checkMsgsInStore(tasksToValidate) -proc reportTaskResult(self: SendService, task: DeliveryTask) = +proc reportTaskResult(self: SendService, task: DeliveryTask) {.async.} = case task.state of DeliveryState.SuccessfullyPropagated: # TODO: in case of unable to strore check messages shall we report success instead? if not task.propagateEventEmitted: info "Message successfully propagated", requestId = task.requestId, msgHash = task.msgHash.to0xHex() - MessagePropagatedEvent.emit( + await MessagePropagatedEvent.emit( self.brokerCtx, task.requestId, task.msgHash.to0xHex() ) task.propagateEventEmitted = true @@ -185,14 +186,14 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) = of DeliveryState.SuccessfullyValidated: info "Message successfully sent", requestId = task.requestId, msgHash = task.msgHash.to0xHex() - MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex()) + await MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex()) return of DeliveryState.FailedToDeliver: error "Failed to send message", requestId = task.requestId, msgHash = task.msgHash.to0xHex(), error = task.errorDesc - MessageErrorEvent.emit( + await MessageErrorEvent.emit( self.brokerCtx, task.requestId, task.msgHash.to0xHex(), task.errorDesc ) return @@ -207,15 +208,16 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) = error = "Message too old", age = task.messageAge() task.state = DeliveryState.FailedToDeliver - MessageErrorEvent.emit( + await MessageErrorEvent.emit( self.brokerCtx, task.requestId, task.msgHash.to0xHex(), "Unable to send within retry time window", ) -proc evaluateAndCleanUp(self: SendService) = - self.taskCache.forEach(self.reportTaskResult(it)) +proc evaluateAndCleanUp(self: SendService) {.async.} = + for task in self.taskCache: + await self.reportTaskResult(task) self.taskCache.keepItIf( it.state != DeliveryState.SuccessfullyValidated and it.state != DeliveryState.FailedToDeliver @@ -241,7 +243,7 @@ proc serviceLoop(self: SendService) {.async.} = while true: await self.trySendMessages() await self.checkStoredMessages() - self.evaluateAndCleanUp() + await self.evaluateAndCleanUp() ## TODO: add circuit breaker to avoid infinite looping in case of persistent failures ## Use OnlineStateChange observers to pause/resume the loop await sleepAsync(ServiceLoopInterval) @@ -264,6 +266,6 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} = contentTopic = task.msg.contentTopic, error = error await self.sendProcessor.process(task) - reportTaskResult(self, task) + await reportTaskResult(self, task) if task.state != DeliveryState.FailedToDeliver: self.addTask(task) diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 1d3892250..aec1ca684 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -1,24 +1,19 @@ import results, chronos import chronicles +import brokers/broker_implement +import logos_delivery/api/messaging_client_interface import logos_delivery/waku/api/types, logos_delivery/waku/node/[waku_node, subscription_manager], logos_delivery/messaging/delivery_service/[recv_service, send_service], logos_delivery/messaging/delivery_service/send_service/delivery_task -type MessagingClient* = ref object +type MessagingClient* = ref object of MessagingClientInterface node: WakuNode sendService*: SendService recvService*: RecvService started: bool -proc new*( - T: type MessagingClient, useP2PReliability: bool, node: WakuNode -): Result[T, string] = - let sendService = ?SendService.new(useP2PReliability, node) - let recvService = RecvService.new(node) - ok(T(node: node, sendService: sendService, recvService: recvService)) - proc start*(self: MessagingClient): Result[void, string] = if self.started: return ok() @@ -34,26 +29,45 @@ proc stop*(self: MessagingClient) {.async.} = await self.recvService.stopRecvService() self.started = false -proc send*( - self: MessagingClient, envelope: MessageEnvelope -): Future[Result[RequestId, string]] {.async.} = - ## High-level messaging API send. Auto-subscribes to the content topic - ## (so the local node sees its own gossipsub broadcast), builds a - ## `DeliveryTask`, and hands it to the send service. Returns the request - ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. - let isSubbed = - self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) - if not isSubbed: - info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic - self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: - warn "Failed to auto-subscribe", error = error - return err("Failed to auto-subscribe before sending: " & error) +BrokerImplement MessagingClient of MessagingClientInterface: + proc new*(T: typedesc[MessagingClient], useP2PReliability: bool, node: WakuNode): T = + let sendService = SendService.new(useP2PReliability, node).valueOr: + error "Failed to initialize SendService", error = error + quit(QuitFailure) - let requestId = RequestId.new(self.node.rng) + let recvService = RecvService.new(node) + T(node: node, sendService: sendService, recvService: recvService, started: false) - let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: - return err("MessagingClient.send: Failed to create delivery task: " & error) + method subscribe( + self: MessagingClient, contentTopic: ContentTopic + ): Future[Result[void, string]] {.async.} = + return self.node.subscriptionManager.subscribe(contentTopic) - asyncSpawn self.sendService.send(deliveryTask) + method unsubscribe( + self: MessagingClient, contentTopic: ContentTopic + ): Future[Result[void, string]] {.async.} = + return self.node.subscriptionManager.unsubscribe(contentTopic) - return ok(requestId) + method send( + self: MessagingClient, envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async.} = + ## High-level messaging API send. Auto-subscribes to the content topic + ## (so the local node sees its own gossipsub broadcast), builds a + ## `DeliveryTask`, and hands it to the send service. Returns the request + ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. + let isSubbed = + self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) + + let requestId = RequestId.new(self.node.rng) + + let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: + return err("MessagingClient.send: Failed to create delivery task: " & error) + + asyncSpawn self.sendService.send(deliveryTask) + + return ok(requestId) diff --git a/logos_delivery/waku/api/api.nim b/logos_delivery/waku/api/api.nim index 5be1e2086..50608b9b8 100644 --- a/logos_delivery/waku/api/api.nim +++ b/logos_delivery/waku/api/api.nim @@ -5,6 +5,10 @@ import chronicles, chronos, libp2p/peerid, results import logos_delivery/waku/factory/waku import logos_delivery/messaging/messaging_client +import logos_delivery/api/messaging_client_interface + # brings the interface `send` method into scope: the impl's `method send` in the + # BrokerImplement block is not exported, so the call below dispatches through the + # MessagingClientInterface method instead. import logos_delivery/waku/[requests/health_requests, waku_core, waku_node] import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/node/subscription_manager diff --git a/logos_delivery/waku/api/api_conf.nim b/logos_delivery/waku/api/api_conf.nim index 5810b1eb2..aa83d30cb 100644 --- a/logos_delivery/waku/api/api_conf.nim +++ b/logos_delivery/waku/api/api_conf.nim @@ -81,9 +81,10 @@ const TheWakuNetworkPreset* = ProtocolsConfig( ), ) -type WakuMode* {.pure.} = enum - Edge - Core +# WakuMode was elevated to logos_delivery/api/types; re-exported here so +# existing call sites are unaffected. +from logos_delivery/api/types import WakuMode +export WakuMode type NodeConfig* {. requiresInit, deprecated: "Use WakuNodeConf from tools/confutils/cli_args instead" diff --git a/logos_delivery/waku/api/types.nim b/logos_delivery/waku/api/types.nim index d51d46994..ae50cb773 100644 --- a/logos_delivery/waku/api/types.nim +++ b/logos_delivery/waku/api/types.nim @@ -2,77 +2,8 @@ import logos_delivery/waku/compat/option_valueor import libp2p/crypto/crypto {.push raises: [].} -import bearssl/rand, std/times, chronos -import stew/byteutils -import logos_delivery/waku/utils/requests as request_utils -import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time] -import logos_delivery/waku/requests/requests - -type - MessageEnvelope* = object - contentTopic*: ContentTopic - payload*: seq[byte] - ephemeral*: bool - meta*: seq[byte] - ## Opaque wire-format marker carried on the underlying WakuMessage. - ## Higher layers (e.g. Reliable Channel) stamp this so peers can route - ## ingress traffic to their corresponding layer. Empty by default. - - RequestId* = distinct string - - ConnectionStatus* {.pure.} = enum - Disconnected - PartiallyConnected - Connected - -proc new*(T: typedesc[RequestId], rng: crypto.Rng): T = - ## Generate a new RequestId using the provided RNG. - RequestId(request_utils.generateRequestId(rng)) - -proc `$`*(r: RequestId): string {.inline.} = - string(r) - -proc `==`*(a, b: RequestId): bool {.inline.} = - string(a) == string(b) - -proc init*( - T: type MessageEnvelope, - contentTopic: ContentTopic, - payload: seq[byte] | string, - ephemeral: bool = false, - meta: seq[byte] = @[], -): MessageEnvelope = - when payload is seq[byte]: - MessageEnvelope( - contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta - ) - else: - MessageEnvelope( - contentTopic: contentTopic, - payload: payload.toBytes(), - ephemeral: ephemeral, - meta: meta, - ) - -proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = - ## Convert a MessageEnvelope to a WakuMessage. - var wm = WakuMessage( - contentTopic: envelope.contentTopic, - payload: envelope.payload, - ephemeral: envelope.ephemeral, - meta: envelope.meta, - timestamp: getNowInNanosecondTime(), - ) - - ## TODO: First find out if proof is needed at all - ## Follow up: left it to the send logic to add RLN proof if needed and possible - # let requestedProof = ( - # waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat()) - # ).valueOr: - # warn "Failed to add RLN proof to WakuMessage: ", error = error - # return wm - - # wm.proof = requestedProof.proof - return wm +# logos_delivery/api/types; re-exported here so existing call sites are unaffected. +import logos_delivery/api/types +export types {.pop.} diff --git a/logos_delivery/waku/events/health_events.nim b/logos_delivery/waku/events/health_events.nim index 4ff6f0c6c..572558e9f 100644 --- a/logos_delivery/waku/events/health_events.nim +++ b/logos_delivery/waku/events/health_events.nim @@ -8,7 +8,7 @@ export protocol_health, topic_health # Notify health changes to node connectivity EventBroker: - type EventConnectionStatusChange* = object + type ConnectionStatusChangeEvent* = object connectionStatus*: ConnectionStatus # Notify health changes to a subscribed topic @@ -16,12 +16,12 @@ EventBroker: # from/to content topic is provided in the new API (so we know which # content topics are of interest to the application) EventBroker: - type EventContentTopicHealthChange* = object + type ContentTopicHealthChangeEvent* = object contentTopic*: ContentTopic health*: TopicHealth # Notify health changes to a shard (pubsub topic) EventBroker: - type EventShardTopicHealthChange* = object + type ShardTopicHealthChangeEvent* = object topic*: PubsubTopic health*: TopicHealth diff --git a/logos_delivery/waku/events/message_events.nim b/logos_delivery/waku/events/message_events.nim index 9338fda67..6d9d93b55 100644 --- a/logos_delivery/waku/events/message_events.nim +++ b/logos_delivery/waku/events/message_events.nim @@ -1,31 +1,10 @@ import brokers/event_broker import logos_delivery/waku/[api/types, waku_core/message, waku_core/topics] -export types - -EventBroker: - # Event emitted when a message is sent to the network - type MessageSentEvent* = object - requestId*: RequestId - messageHash*: string - -EventBroker: - # Event emitted when a message send operation fails - type MessageErrorEvent* = object - requestId*: RequestId - messageHash*: string - error*: string - -EventBroker: - # Confirmation that a message has been correctly delivered to some neighbouring nodes. - type MessagePropagatedEvent* = object - requestId*: RequestId - messageHash*: string - -EventBroker: - # Event emitted when a message is received via Waku - type MessageReceivedEvent* = object - messageHash*: string - message*: WakuMessage +from logos_delivery/api/messaging_client_interface import + MessageSentEvent, MessageErrorEvent, MessagePropagatedEvent, MessageReceivedEvent +export + types, MessageSentEvent, MessageErrorEvent, MessagePropagatedEvent, + MessageReceivedEvent EventBroker: # Internal event emitted when a message arrives from the network via any protocol diff --git a/logos_delivery/waku/factory/waku.nim b/logos_delivery/waku/factory/waku.nim index e15a92fd0..8f06cb223 100644 --- a/logos_delivery/waku/factory/waku.nim +++ b/logos_delivery/waku/factory/waku.nim @@ -384,35 +384,6 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} = error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return -proc mountMessagingClient*(waku: Waku): Result[void, string] = - if not waku.messagingClient.isNil(): - return err("messaging client already mounted") - if waku.node.started: - return err("cannot mount messaging client on a started node") - waku.messagingClient = MessagingClient.new(waku.conf.p2pReliability, waku.node).valueOr: - return err("could not create messaging client: " & $error) - return ok() - -proc mountReliableChannelManager*(waku: Waku): Result[void, string] = - if not waku.reliableChannelManager.isNil(): - return err("reliable channel manager already mounted") - if waku.messagingClient.isNil(): - return err("reliable channel manager requires a mounted messaging client") - if waku.node.started: - return err("cannot mount reliable channel manager on a started node") - - let messagingClient = waku.messagingClient - let defaultSendHandler: SendHandler = proc( - envelope: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - return await messagingClient.send(envelope) - - waku.reliableChannelManager = ReliableChannelManager.new( - messagingClient, defaultSendHandler, waku.brokerCtx - ).valueOr: - return err("could not create reliable channel manager: " & $error) - return ok() - proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if waku.node.started: warn "start: waku node already started" diff --git a/logos_delivery/waku/factory/waku_state_info.nim b/logos_delivery/waku/factory/waku_state_info.nim index e608d5dd2..719c24688 100644 --- a/logos_delivery/waku/factory/waku_state_info.nim +++ b/logos_delivery/waku/factory/waku_state_info.nim @@ -8,18 +8,13 @@ import std/[tables, sequtils, strutils] import metrics, eth/p2p/discoveryv5/enr, libp2p/peerid, stew/byteutils import logos_delivery/waku/[waku_node, net/bound_ports] -type - NodeInfoId* {.pure.} = enum - Version - Metrics - MyMultiaddresses - MyENR - MyPeerId - MyBoundPorts - MyMixPubKey +# NodeInfoId was elevated to logos_delivery/api/types; re-exported here so +# existing call sites are unaffected. +from logos_delivery/api/types import NodeInfoId +export NodeInfoId - WakuStateInfo* {.requiresInit.} = object - node: WakuNode +type WakuStateInfo* {.requiresInit.} = object + node: WakuNode proc getAllPossibleInfoItemIds*(self: WakuStateInfo): seq[NodeInfoId] = ## Returns all possible options that can be queried to learn about the node's information. diff --git a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim index 6e77816f0..2ed54c291 100644 --- a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim +++ b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim @@ -51,7 +51,7 @@ type NodeHealthMonitor* = ref object ## if it doesn't make sense for the protocol in question, this is set to zero. relayObserver: PubSubObserver peerEventListener: WakuPeerEventListener - shardHealthListener: EventShardTopicHealthChangeListener + shardHealthListener: ShardTopicHealthChangeEventListener eventLoopLagExceeded: bool ## set to true when the chronos event loop lag exceeds the severe threshold, ## causing the node health to be reported as EVENT_LOOP_LAGGING until lag recovers. @@ -513,7 +513,7 @@ proc healthLoop(hm: NodeHealthMonitor) {.async.} = hm.connectionStatus = newConnectionStatus - EventConnectionStatusChange.emit(hm.node.brokerCtx, newConnectionStatus) + ConnectionStatusChangeEvent.emit(hm.node.brokerCtx, newConnectionStatus) if not isNil(hm.onConnectionStatusChange): await hm.onConnectionStatusChange(newConnectionStatus) @@ -690,10 +690,10 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] = ).valueOr: return err("Failed to subscribe to peer events: " & error) - hm.shardHealthListener = EventShardTopicHealthChange.listen( + hm.shardHealthListener = ShardTopicHealthChangeEvent.listen( hm.node.brokerCtx, proc( - evt: EventShardTopicHealthChange + evt: ShardTopicHealthChangeEvent ): Future[void] {.async: (raises: []), gcsafe.} = hm.healthUpdateEvent.fire(), ).valueOr: @@ -728,7 +728,7 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = await hm.eventLoopMonitorFut.cancelAndWait() await WakuPeerEvent.dropListener(hm.node.brokerCtx, hm.peerEventListener) - await EventShardTopicHealthChange.dropListener( + await ShardTopicHealthChangeEvent.dropListener( hm.node.brokerCtx, hm.shardHealthListener ) diff --git a/logos_delivery/waku/node/subscription_manager.nim b/logos_delivery/waku/node/subscription_manager.nim index 15b582ea6..90c98ef42 100644 --- a/logos_delivery/waku/node/subscription_manager.nim +++ b/logos_delivery/waku/node/subscription_manager.nim @@ -314,7 +314,7 @@ proc updateShardHealth( let newHealth = toTopicHealth(state.peers.len) if newHealth != state.currentHealth: state.currentHealth = newHealth - EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth) + ShardTopicHealthChangeEvent.emit(self.node.brokerCtx, shard, newHealth) proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) = ## Remove a peer from edgeFilterSubStates for the given shard, diff --git a/logos_delivery/waku/waku_core/message/message.nim b/logos_delivery/waku/waku_core/message/message.nim index acd7055a0..03965b051 100644 --- a/logos_delivery/waku/waku_core/message/message.nim +++ b/logos_delivery/waku/waku_core/message/message.nim @@ -5,25 +5,9 @@ {.push raises: [].} -import ../topics, ../time +# WakuMessage was elevated to logos_delivery/api/types; re-exported here so +# existing call sites are unaffected. +from logos_delivery/api/types import WakuMessage +export WakuMessage const MaxMetaAttrLength* = 64 # 64 bytes - -type WakuMessage* = object # Data payload transmitted. - payload*: seq[byte] - # String identifier that can be used for content-based filtering. - contentTopic*: ContentTopic - # Application specific metadata. - meta*: seq[byte] - # Number to discriminate different types of payload encryption. - # Compatibility with Whisper/WakuV1. - version*: uint32 - # Sender generated timestamp. - timestamp*: Timestamp - # The ephemeral attribute indicates signifies the transient nature of the - # message (if the message should be stored). - ephemeral*: bool - # Part of RFC 17: https://rfc.vac.dev/spec/17/ - # The proof attribute indicates that the message is not spam. This - # attribute will be used in the rln-relay protocol. - proof*: seq[byte] diff --git a/logos_delivery/waku/waku_core/topics/content_topic.nim b/logos_delivery/waku/waku_core/topics/content_topic.nim index 23da9a24a..8de949105 100644 --- a/logos_delivery/waku/waku_core/topics/content_topic.nim +++ b/logos_delivery/waku/waku_core/topics/content_topic.nim @@ -12,7 +12,10 @@ export parsing ## Content topic -type ContentTopic* = string +# ContentTopic was elevated to logos_delivery/api/types; re-exported here so +# existing call sites are unaffected. +from logos_delivery/api/types import ContentTopic +export ContentTopic const DefaultContentTopic* = ContentTopic("/waku/2/default-content/proto") diff --git a/logos_delivery/waku/waku_relay/protocol.nim b/logos_delivery/waku/waku_relay/protocol.nim index e677ec5a0..2a1ef3151 100644 --- a/logos_delivery/waku/waku_relay/protocol.nim +++ b/logos_delivery/waku/waku_relay/protocol.nim @@ -503,7 +503,7 @@ proc topicsHealthLoop(w: WakuRelay) {.async.} = w.topicsHealth[topic] = currentHealth - EventShardTopicHealthChange.emit(w.brokerCtx, topic, currentHealth) + ShardTopicHealthChangeEvent.emit(w.brokerCtx, topic, currentHealth) if not w.onTopicHealthChange.isNil(): futs.add(w.onTopicHealthChange(topic, currentHealth)) diff --git a/nim_brokers_instructions.md b/nim_brokers_instructions.md new file mode 100644 index 000000000..9465de289 --- /dev/null +++ b/nim_brokers_instructions.md @@ -0,0 +1,395 @@ +# Working with `nim-brokers` + +> Drop-in CLAUDE.md addon for any project that depends on the `brokers` nimble +> package. Type-safe, decoupled messaging on top of **chronos** + **results**. +> All public APIs are exception-free: errors ride `Result[T, string]`, never raises. + +## Mental model + +Three macros, each declares a **broker type** and generates its full API. The +type *is* the channel — you call class-method-style on the typedesc: `T.emit`, +`T.request`, `T.listen`, `T.setProvider`. No instances, no singletons to wire. + +| Macro | Pattern | Producer side | Consumer side | +|-------|---------|---------------|---------------| +| `EventBroker` | pub/sub, many→many, fire-and-forget | `T.emit(...)` | `T.listen(handler)` | +| `RequestBroker` | request/response, **single** provider | `T.setProvider(handler)` | `T.request(...)` | +| `MultiRequestBroker` | request/response, **many** providers, fan-out | `T.setProvider(handler)` (N×) | `T.request(...)` | + +`(mt)` suffix → multi-thread variant (cross-thread dispatch). `(sync)` on +RequestBroker → blocking, non-async. `(API)` → FFI shared-library surface. + +Import only what you use: +```nim +import brokers/event_broker +import brokers/request_broker +import brokers/multi_request_broker +import brokers/broker_context # only if you need explicit contexts +``` + +--- + +## EventBroker — pub/sub + +```nim +import chronos, brokers/event_broker + +EventBroker: + type UserLoggedIn = object + userId*: int + name*: string + +# listen returns Result[ListenerHandle, string]; keep the handle to drop later +let h = UserLoggedIn.listen( + proc(evt: UserLoggedIn): Future[void] {.async: (raises: []).} = + info "login", id = evt.userId +) + +UserLoggedIn.emit(UserLoggedIn(userId: 7, name: "zoli")) # by value +UserLoggedIn.emit(userId = 7, name = "zoli") # by fields (inline-object only) + +await UserLoggedIn.dropListener(h.get()) # drop one — cancels its in-flight work +await UserLoggedIn.dropAllListeners() # drop all for this context +``` + +- `emit` is **sync** here (single-thread): snapshots listeners, `asyncSpawn`s + each. It does not await delivery — `await sleepAsync(0)` or yield to flush in tests. +- Handlers MUST be `{.async: (raises: []).}`. Swallow your own exceptions. +- `dropListener`/`dropAllListeners` are `async` and **cancel** in-flight handlers + before returning — safe teardown point before releasing resources. + +### Payload variants +```nim +EventBroker: + type Tick = void # payload-less signal: Tick.emit() / listen(proc(): Future[void]...) +EventBroker: + type Score = int # native/alias/external types auto-wrapped in distinct +EventBroker: + type Blob = ref object # ref payloads fine + data*: seq[byte] +``` + +--- + +## RequestBroker — single provider request/response + +Two declaration styles. **Coupled** (named `type` + `proc`) and **proc-sugar** +(payload decoupled, broker named after the Capitalized verb). + +```nim +import chronos, brokers/request_broker + +# Coupled: broker name == type name == request() return payload +RequestBroker: + type FetchUser = object + name*: string + proc signature*(id: int): Future[Result[FetchUser, string]] {.async.} + +FetchUser.setProvider( + proc(id: int): Future[Result[FetchUser, string]] {.async.} = + ok(FetchUser(name: "u" & $id)) +).isOk() + +let r = await FetchUser.request(42) # Result[FetchUser, string] +FetchUser.clearProvider() +``` + +```nim +# Proc-sugar: broker = Capitalized verb, request() returns the RAW payload +RequestBroker: + proc getVersion(): Future[Result[string, string]] {.async.} # -> broker `GetVersion` + +GetVersion.setProvider( + proc(): Future[Result[string, string]] {.async.} = ok("1.2.3")).get() +let v = await GetVersion.request() # r.value is plain string, no unwrap +``` + +Rules & behaviors: +- **One provider per signature.** A second `setProvider` returns `err(...)` (no + silent override). `clearProvider()` first to swap. +- Two signature slots coexist: zero-arg and arg-based (overload by arity). +- Provider exceptions are caught → `err()`. Unset provider → `err(...)`. +- `isProvided()` checks registration. `T.request` is `async` here. + +### Sync mode — no event loop needed +```nim +RequestBroker(sync): + proc getId(): Result[int, string] # note: no Future, no {.async.} +GetId.setProvider(proc(): Result[int, string] = ok(42)).isOk() +let id = GetId.request() # blocking, returns Result directly +``` + +### void payload (action with no return value) +```nim +RequestBroker: + proc doReset(force: bool): Future[Result[void, string]] {.async.} +DoReset.setProvider(proc(force: bool): Future[Result[void, string]] {.async.} = + if force: ok() else: err("need force")).isOk() +``` + +--- + +## MultiRequestBroker — fan-out to many providers + +Async only. `request()` calls **all** providers via `allFinished`, returns +`Result[seq[Payload], string]`. Any provider failing fails the whole request. + +```nim +import chronos, brokers/multi_request_broker + +MultiRequestBroker: + type Quote = object + price*: int + proc signature*(sym: string): Future[Result[Quote, string]] {.async.} + +discard Quote.setProvider(proc(sym: string): Future[Result[Quote, string]] {.async.} = + ok(Quote(price: 100))) +discard Quote.setProvider(proc(sym: string): Future[Result[Quote, string]] {.async.} = + ok(Quote(price: 101))) + +let all = await Quote.request("BTC") # all.get() is seq[Quote], len == 2 +Quote.removeProvider(handle.get()) # remove one (handle from setProvider) +Quote.clearProviders() # remove all +``` + +- No providers registered → `ok(@[])` (empty, not error). +- Identical handler refs deduplicated on registration. +- `setProvider` returns `Result[ProviderHandle, string]`; capture it for `removeProvider`. + +--- + +## BrokerContext — scoping / multi-instance + +Every API takes an **optional first `BrokerContext` arg**. Omit it → the +thread-global context (`DefaultBrokerContext`). Use contexts to run independent +broker instances (per component, per test, per thread). + +```nim +import brokers/broker_context + +let ctx = NewBrokerContext() # globally-unique id (atomic) + +discard MyEvent.listen(ctx, handler) +MyEvent.emit(ctx, payload) +FetchUser.setProvider(ctx, provider) +let r = await FetchUser.request(ctx, 42) +await MyEvent.dropAllListeners(ctx) +``` + +Thread setup helpers (callable before the event loop starts): +| Call | Use | +|------|-----| +| `setThreadBrokerContext(ctx)` | adopt a context created elsewhere as this thread's global | +| `initThreadBrokerContext(): BrokerContext` | create + set as thread-global in one call | +| `threadGlobalBrokerContext()` | read current thread global (lock-free) | + +Async scoped swap (needs chronos loop): `lockGlobalBrokerContext` / +`lockNewGlobalBrokerContext` templates. + +--- + +## Multi-thread variants `(mt)` + +Add `(mt)`. Same surface, but **`emit` becomes async** (cross-thread dispatch +via `Channel[T]`). Build with `--threads:on`. + +```nim +EventBroker(mt): + type Job = object + id*: int + +# from any thread: +proc worker() {.thread.} = + waitFor Job.emit(Job(id: 1)) # mt emit is async — await / waitFor it +``` + +- Same-thread calls take a direct fast path; cross-thread go through a per-bucket + channel drained by one dispatch coroutine. fd cost is **O(threads)**, not per-broker. +- A thread that listens must keep its event loop alive (the broker dispatches on it). +- MT brokers accept capacity kwargs: `EventBroker(mt, queueDepth = ..., slabCapacity = ..., + maxPayloadBytes = ..., preset = "...")`. Omit for defaults. + +--- + +## Decision guide + +| You want… | Use | +|-----------|-----| +| Notify N listeners, don't care about replies | `EventBroker` | +| Ask one authority for an answer | `RequestBroker` | +| Blocking call, no async context | `RequestBroker(sync)` | +| Ask everyone, aggregate replies | `MultiRequestBroker` | +| Same pattern across OS threads | add `(mt)`, `--threads:on`, await `emit` | +| Multiple isolated instances | pass a `BrokerContext` first arg | +| Expose to C/C++/Python/Rust/Go | `(API)` + `registerBrokerLibrary` (see AGENTS.md) | + +## Gotchas + +- Handlers/providers are `raises: []` — never let an exception escape; return `err()`. +- `setProvider` on a RequestBroker that already has one **fails** — clear first. +- Single-thread `emit` returns immediately; await a yield before asserting in tests. +- A non-`object`/`ref object` broker type is auto-wrapped in `distinct`; construct + with `T(value)` and read with the base-type conversion. +- Keep all interaction with one context on one thread (single-thread brokers are + thread-local); cross-thread requires the `(mt)` variant. + +--- + +## FFI API `(API)` — expose brokers as a C/C++/Python/Rust/Go shared library + +Add `(API)` to `RequestBroker`/`EventBroker`. Same declaration syntax — it +additionally generates a fixed C ABI and typed foreign wrappers. Wire format is +CBOR; wrappers carry the typed surface. Build with `-d:BrokerFfiApi --threads:on +--app:lib`. + +```nim +{.push raises: [].} +import brokers/[event_broker, request_broker, broker_context, api_library] + +# Plain Nim object types used in signatures are AUTO-registered — no annotation. +type DeviceInfo* = object + deviceId*: int64 + name*: string + online*: bool + +RequestBroker(API): + type GetDevice = object # broker name == type name == response payload + deviceId*: int64 + name*: string + proc signature*(deviceId: int64): Future[Result[GetDevice, string]] {.async.} + +EventBroker(API): + type DeviceStatusChanged = object + deviceId*: int64 + online*: bool + timestampMs*: int64 +``` + +Providers + event emission live in one proc named **`setupProviders`** (the +generated runtime calls it on the processing thread during `createContext`): + +```nim +proc setupProviders(ctx: BrokerContext): Result[void, string] = + let r = GetDevice.setProvider(ctx, # always pass the ctx the runtime gives you + proc(deviceId: int64): Future[Result[GetDevice, string]] {.closure, async.} = + await DeviceStatusChanged.emit(ctx, # API emit is async — await it + DeviceStatusChanged(deviceId: deviceId, online: true, timestampMs: 0)) + ok(GetDevice(deviceId: deviceId, name: "u"))) + if r.isErr(): return err("register GetDevice: " & r.error()) + ok() + +# MUST be the last declaration in the module: +registerBrokerLibrary: + name: "mylib" # MUST match --nimMainPrefix and the .so basename + version: "1.0.0" # baked into _version() static string + initializeRequest: InitializeRequest # post-create config broker (optional) + shutdownRequest: ShutdownRequest # orderly teardown broker (optional) +{.pop.} +``` + +Build (name / `--nimMainPrefix` / `registerBrokerLibrary name` must all match): +``` +nim c -d:BrokerFfiApi --threads:on --app:lib --path:. \ + --outdir:build --nimMainPrefix:mylib mylib.nim +``` + +What you get — a fixed **11-function C ABI** per library: `_version`, +`_initialize` (once per process), `_createContext` (per instance), `_shutdown(ctx)`, +`_allocBuffer`, `_freeBuffer`, `_call`, `_subscribe`, `_unsubscribe`, `_listApis`, +`_getSchema`. `.h` (C) and `.hpp` (C++) are always emitted. + +| Flag | Emits | Notes | +|------|-------|-------| +| *(default)* | `.h`, `.hpp` | C + C++ always | +| `-d:BrokerFfiApiGenPy` | `.py` (cbor2) | next to the `.so` | +| `-d:BrokerFfiApiGenRust` | `_rs/` Cargo crate | ciborium + serde | +| `-d:BrokerFfiApiGenGo` | `_go/` Go module | fxamacker/cbor | + +FFI rules: +- `registerBrokerLibrary` is a **no-op without `-d:BrokerFfiApi`** — no `when defined` + guard needed; the normal in-process broker API still works. +- `(API)` brokers ride the MT lane, so they accept the same capacity kwargs as + `(mt)`: `RequestBroker(API, queueDepth = .., slabCapacity = .., maxPayloadBytes = .., + preset = "..")`. +- `_createContext()` is readiness-synchronous: returns only after providers + + listeners are installed and the event courier is live. +- Inspect generated Nim with `-d:brokerDebug` → `build/broker_debug/*.gen.nim`. + +--- + +## BrokerInterface / BrokerImplement — hierarchical / OOP layer + +An object-oriented facade over the brokers: an **interface** groups several +brokers behind one abstract type; an **implementation** provides per-instance +methods. Each instance gets its own `BrokerContext`, so two instances of the same +impl are fully isolated. Direct `instance.method()` calls **tunnel through broker +dispatch** (so provider mocks are honored — not a plain vtable call). + +```nim +import brokers/broker_interface +import brokers/broker_implement + +BrokerInterface(IGreeter): + EventBroker: + type Greeted = object + who: string + RequestBroker: + proc greet(name: string): Future[Result[string, string]] {.async.} + RequestBroker: + proc version(): Future[Result[string, string]] {.async.} + +type GreeterImpl = ref object of IGreeter # MUST be `ref object of ` + prefix: string + +BrokerImplement GreeterImpl of IGreeter: + proc new(T: typedesc[GreeterImpl], prefix: string): GreeterImpl = + GreeterImpl(prefix: prefix) # optional ctor; create() calls it + method greet(self: GreeterImpl, name: string): Future[Result[string, string]] {.async.} = + ok(self.prefix & name) + method version(self: GreeterImpl): Future[Result[string, string]] {.async.} = + ok("v2") +``` + +Use it: +```nim +let g = GreeterImpl.create(prefix = "hi ") # new() + wires providers under g.brokerCtx +echo (waitFor g.greet("sue")).value # "hi sue" — tunnels through Greet broker + +let base: IGreeter = g # virtual dispatch via the interface type +echo (waitFor base.greet("x")).value # resolves to the override + +# Each instance is isolated by its own context: +let a = GreeterImpl.create(prefix = "a:") +let b = GreeterImpl.create(prefix = "b:") +# a.brokerCtx != b.brokerCtx + +g.close() # clears THIS instance's providers + listeners; idempotent +``` + +Event facade (instance-scoped listen/emit — context is injected for you): +```nim +discard g.listen(Greeted, + proc(ev: Greeted): Future[void] {.async: (raises: []), gcsafe.} = …) +g.emit(Greeted, Greeted(who: "bob")) +``` + +Factory / dependency injection (resolve an impl behind the interface): +```nim +IGreeter.provideFactory( + proc(cfg: string): Result[IGreeter, string] = + ok(GreeterImpl.create(prefix = cfg))) +let d = IGreeter.create("cfg:") # Result[IGreeter, string]; last factory wins +``` + +Key points: +- The broker for `proc greet` is named **`Greet`** (Capitalized verb). Address it + directly with the instance context: `Greet.request(g.brokerCtx, "bob")`, + `Greet.clearProvider(g.brokerCtx)` (e.g. to install a mock). +- `Impl.create(args…)` = fresh context + `new` + provider wiring. + `Impl.createUnderContext(ctx, args…)` wires under an externally-supplied context + (the path the FFI runtime drives). +- `BrokerInterface(API, IName)` lowers the sub-brokers onto the MT/FFI lane so the + whole interface can be exposed as a shared library; `BrokerImplement` is unchanged. +- Sub-instances returned from a method (factory pattern) share the parent's + `classCtx` (routing) but get a distinct `instanceCtx` — see `classCtx()` / + `instanceCtx()` accessors. diff --git a/nimble.lock b/nimble.lock index 18ebde258..96aaebb9b 100644 --- a/nimble.lock +++ b/nimble.lock @@ -328,8 +328,8 @@ } }, "brokers": { - "version": "#v3.1.1", - "vcsRevision": "a7316a35f1b62e3497ae8ee0fc1aace74df0beb2", + "version": "#cf5ee65cc20211068d7191de7e5e177c0dc212fa", + "vcsRevision": "cf5ee65cc20211068d7191de7e5e177c0dc212fa", "url": "https://github.com/NagyZoltanPeter/nim-brokers.git", "downloadMethod": "git", "dependencies": [ @@ -341,7 +341,7 @@ "cbor_serialization" ], "checksums": { - "sha1": "4447d7c1f9da14ae439afb23aee45116ce2ecb40" + "sha1": "9bb46550b5bb9bde6722697d678ea236e2143350" } }, "stint": { diff --git a/tests/api/test_api_health.nim b/tests/api/test_api_health.nim index d8a7eabf2..376b253ae 100644 --- a/tests/api/test_api_health.nim +++ b/tests/api/test_api_health.nim @@ -32,35 +32,35 @@ proc waitForConnectionStatus( var future = newFuture[void]("waitForConnectionStatus") let handler: EventConnectionStatusChangeListenerProc = proc( - e: EventConnectionStatusChange + e: ConnectionStatusChangeEvent ) {.async: (raises: []), gcsafe.} = if not future.finished: if e.connectionStatus == expected: future.complete() - let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr: + let handle = ConnectionStatusChangeEvent.listen(brokerCtx, handler).valueOr: raiseAssert error try: if not await future.withTimeout(TestTimeout): raiseAssert "Timeout waiting for status: " & $expected finally: - await EventConnectionStatusChange.dropListener(brokerCtx, handle) + await ConnectionStatusChangeEvent.dropListener(brokerCtx, handle) proc waitForShardHealthy( brokerCtx: BrokerContext -): Future[EventShardTopicHealthChange] {.async.} = - var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy") +): Future[ShardTopicHealthChangeEvent] {.async.} = + var future = newFuture[ShardTopicHealthChangeEvent]("waitForShardHealthy") let handler: EventShardTopicHealthChangeListenerProc = proc( - e: EventShardTopicHealthChange + e: ShardTopicHealthChangeEvent ) {.async: (raises: []), gcsafe.} = if not future.finished: if e.health == TopicHealth.MINIMALLY_HEALTHY or e.health == TopicHealth.SUFFICIENTLY_HEALTHY: future.complete(e) - let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr: + let handle = ShardTopicHealthChangeEvent.listen(brokerCtx, handler).valueOr: raiseAssert error try: @@ -69,7 +69,7 @@ proc waitForShardHealthy( else: raiseAssert "Timeout waiting for shard health event" finally: - await EventShardTopicHealthChange.dropListener(brokerCtx, handle) + await ShardTopicHealthChangeEvent.dropListener(brokerCtx, handle) suite "LM API health checking": var @@ -94,7 +94,7 @@ suite "LM API health checking": lockNewGlobalBrokerContext: var conf = defaultWakuNodeConf().valueOr: raiseAssert error - conf.mode = Core + conf.mode = some(WakuMode.Core) conf.listenAddress = parseIpAddress("0.0.0.0") conf.tcpPort = Port(0) conf.discv5UdpPort = Port(0) @@ -193,7 +193,7 @@ suite "LM API health checking": check isConnected == true - asyncTest "EventConnectionStatusChange, detect connect and disconnect": + asyncTest "ConnectionStatusChangeEvent, detect connect and disconnect": let connectFuture = waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected) @@ -205,7 +205,7 @@ suite "LM API health checking": await client.node.disconnectNode(servicePeerInfo) await disconnectFuture - asyncTest "EventShardTopicHealthChange, detect health improvement": + asyncTest "ShardTopicHealthChangeEvent, detect health improvement": client.node.wakuRelay.subscribe(DefaultShard, dummyHandler) let healthEventFuture = waitForShardHealthy(client.brokerCtx) @@ -273,7 +273,7 @@ suite "LM API health checking": lockNewGlobalBrokerContext: var edgeConf = defaultWakuNodeConf().valueOr: raiseAssert error - edgeConf.mode = Edge + edgeConf.mode = some(WakuMode.Edge) edgeConf.listenAddress = parseIpAddress("0.0.0.0") edgeConf.tcpPort = Port(0) edgeConf.discv5UdpPort = Port(0) diff --git a/tests/api/test_node_conf.nim b/tests/api/test_node_conf.nim index a5bff3906..b89efb0e4 100644 --- a/tests/api/test_node_conf.nim +++ b/tests/api/test_node_conf.nim @@ -16,7 +16,7 @@ suite "WakuNodeConf - mode-driven toWakuConf": ## Given var conf = defaultWakuNodeConf().valueOr: raiseAssert error - conf.mode = Core + conf.mode = some(WakuMode.Core) conf.clusterId = some(1'u16) ## When @@ -37,7 +37,7 @@ suite "WakuNodeConf - mode-driven toWakuConf": ## Given var conf = defaultWakuNodeConf().valueOr: raiseAssert error - conf.mode = Edge + conf.mode = some(WakuMode.Edge) conf.clusterId = some(1'u16) ## When @@ -54,11 +54,11 @@ suite "WakuNodeConf - mode-driven toWakuConf": wakuConf.storeServiceConf.isSome() == false wakuConf.peerExchangeService == true - test "noMode uses explicit CLI flags as-is": + test "WakuMode.none uses explicit CLI flags as-is": ## Given var conf = defaultWakuNodeConf().valueOr: raiseAssert error - conf.mode = cli_args.WakuMode.noMode + conf.mode = none[WakuMode]() conf.relay = true conf.lightpush = false conf.clusterId = some(5'u16) @@ -79,7 +79,7 @@ suite "WakuNodeConf - mode-driven toWakuConf": ## Given - user sets relay=false but mode=Core should override var conf = defaultWakuNodeConf().valueOr: raiseAssert error - conf.mode = Core + conf.mode = some(WakuMode.Core) conf.relay = false # will be overridden by Core mode ## When @@ -101,7 +101,7 @@ suite "WakuNodeConf - JSON parsing with fieldPairs": require confRes.isOk() let conf = confRes.get() check: - conf.mode == cli_args.WakuMode.noMode + conf.mode == none[WakuMode]() conf.clusterId.isNone() conf.logLevel == logging.LogLevel.INFO @@ -113,7 +113,7 @@ suite "WakuNodeConf - JSON parsing with fieldPairs": require confRes.isOk() let conf = confRes.get() check: - conf.mode == Core + conf.mode == some(WakuMode.Core) conf.clusterId == some(42'u16) test "JSON with Edge mode": @@ -124,7 +124,7 @@ suite "WakuNodeConf - JSON parsing with fieldPairs": require confRes.isOk() let conf = confRes.get() check: - conf.mode == Edge + conf.mode == some(WakuMode.Edge) test "JSON with logLevel": ## Given / When diff --git a/tests/node/test_wakunode_health_monitor.nim b/tests/node/test_wakunode_health_monitor.nim index 480cb235c..d0e1441e2 100644 --- a/tests/node/test_wakunode_health_monitor.nim +++ b/tests/node/test_wakunode_health_monitor.nim @@ -228,8 +228,7 @@ suite "Health Monitor - events": nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") await nodeA.start() - let ds = - MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient") + let ds = MessagingClient.new(false, nodeA) ds.start().expect("Failed to start MessagingClient") let monitorA = NodeHealthMonitor.new(nodeA) @@ -332,8 +331,7 @@ suite "Health Monitor - events": nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") await nodeA.start() - let ds = - MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient") + let ds = MessagingClient.new(false, nodeA) ds.start().expect("Failed to start MessagingClient") let subMgr = nodeA.subscriptionManager @@ -392,13 +390,13 @@ suite "Health Monitor - events": check lastStatus == ConnectionStatus.PartiallyConnected - var shardHealthFut = newFuture[EventShardTopicHealthChange]("waitForShardHealth") + var shardHealthFut = newFuture[ShardTopicHealthChangeEvent]("waitForShardHealth") - let shardHealthLis = EventShardTopicHealthChange + let shardHealthLis = ShardTopicHealthChangeEvent .listen( nodeA.brokerCtx, proc( - evt: EventShardTopicHealthChange + evt: ShardTopicHealthChangeEvent ): Future[void] {.async: (raises: []), gcsafe.} = if not shardHealthFut.finished and ( evt.health == TopicHealth.MINIMALLY_HEALTHY or @@ -413,7 +411,7 @@ suite "Health Monitor - events": subMgr.subscribe(contentTopic).expect("Failed to subscribe") let shardHealthOk = await shardHealthFut.withTimeout(TestConnectivityTimeLimit) - await EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis) + await ShardTopicHealthChangeEvent.dropListener(nodeA.brokerCtx, shardHealthLis) check shardHealthOk == true check nodeA.subscriptionManager.edgeFilterSubStates.len > 0 diff --git a/tests/test_waku.nim b/tests/test_waku.nim index ff009c3ba..d69b54e4b 100644 --- a/tests/test_waku.nim +++ b/tests/test_waku.nim @@ -14,7 +14,7 @@ suite "Waku API - Create node": ## Given var nodeConf = defaultWakuNodeConf().valueOr: raiseAssert "defaultWakuNodeConf failed: " & error - nodeConf.mode = Core + nodeConf.mode = some(WakuMode.Core) nodeConf.clusterId = some(3'u16) nodeConf.rest = false @@ -34,7 +34,7 @@ suite "Waku API - Create node": ## Given var nodeConf = defaultWakuNodeConf().valueOr: raiseAssert "defaultWakuNodeConf failed: " & error - nodeConf.mode = Core + nodeConf.mode = some(WakuMode.Core) nodeConf.clusterId = some(99'u16) nodeConf.rest = false nodeConf.numShardsInNetwork = 16 @@ -66,7 +66,7 @@ suite "Waku API - Create node": ## Given var nodeConf = defaultWakuNodeConf().valueOr: raiseAssert "defaultWakuNodeConf failed: " & error - nodeConf.mode = Core + nodeConf.mode = some(WakuMode.Core) nodeConf.clusterId = some(42'u16) nodeConf.rest = false nodeConf.entryNodes = @[ diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index 5228dfff7..5a79a4945 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -20,6 +20,8 @@ import secp256k1, json +from logos_delivery/api/types import WakuMode + import logos_delivery/waku/factory/[waku_conf, conf_builder/conf_builder, networks_config], logos_delivery/waku/common/[logging], @@ -37,7 +39,7 @@ import ./envvar as confEnvvarDefs, ./envvar_net as confEnvvarNet export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet, ProtectedShard, - DefaultMaxWakuMessageSizeStr, DefaultAgentString + DefaultMaxWakuMessageSizeStr, DefaultAgentString, WakuMode logScope: topics = "waku cli args" @@ -60,11 +62,6 @@ type StartUpCommand* = enum noCommand # default, runs waku generateRlnKeystore # generates a new RLN keystore -type WakuMode* {.pure.} = enum - noMode # default - use explicit CLI flags as-is - Core # full service node - Edge # client-only node - type WakuNodeConf* = object configFile* {. desc: "Loads configuration from a TOML file (cmd-line parameters take precedence)", @@ -168,10 +165,10 @@ type WakuNodeConf* = object ## General node config mode* {. desc: - "Node operation mode. 'Core' enables relay+service protocols. 'Edge' enables client-only protocols. Default: explicit CLI flags used.", - defaultValue: WakuMode.noMode, + "Node operation mode. 'Core' enables relay+service protocols. 'Edge' enables client-only protocols. Default (unset): explicit CLI flags used.", + defaultValue: none(WakuMode), name: "mode" - .}: WakuMode + .}: Option[WakuMode] preset* {. desc: @@ -958,6 +955,25 @@ proc load*(T: type WakuNodeConf, version = ""): ConfResult[T] = except CatchableError: err(getCurrentExceptionMsg()) +proc loadWakuNodeConfFromFile*(configFile: string): ConfResult[WakuNodeConf] = + ## Load a WakuNodeConf from a specific TOML config file path (no CLI args). + ## Deliberately a CONCRETE proc (no `T: type` typedesc param): that keeps it + ## non-generic, so confutils' `load` macro expands HERE in cli_args' full scope + ## (its own `WakuMode` with `noMode`, `logging`, …) instead of at the caller's + ## scope — where a different `WakuMode` (api/types) would break `noMode`. + try: + let conf = WakuNodeConf.load( + version = "", + cmdLine = @[], + secondarySources = proc( + conf: WakuNodeConf, sources: auto + ) {.gcsafe, raises: [ConfigurationError].} = + sources.addConfigFile(Toml, InputFile(configFile)), + ) + ok(conf) + except CatchableError: + err(getCurrentExceptionMsg()) + proc defaultWakuNodeConf*(): ConfResult[WakuNodeConf] = try: let conf = WakuNodeConf.load(version = "", cmdLine = @[]) @@ -1204,25 +1220,25 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = chronos.seconds(n.kadServiceLookupIntervalSec.int64) ) - # Mode-driven configuration overrides - case n.mode - of WakuMode.Core: - b.withRelay(true) - b.filterServiceConf.withEnabled(true) - b.withLightPush(true) - b.discv5Conf.withEnabled(true) - b.withPeerExchange(true) - b.withRendezvous(true) - b.rateLimitConf.withRateLimitsIfNotAssigned( - @["filter:100/1s", "lightpush:5/1s", "px:5/1s"] - ) - of WakuMode.Edge: - b.withPeerExchange(true) - b.withRelay(false) - b.filterServiceConf.withEnabled(false) - b.withLightPush(false) - b.storeServiceConf.withEnabled(false) - of WakuMode.noMode: - discard # use explicit CLI flags as-is + # Mode-driven configuration overrides. `none` (formerly `noMode`) means: + # use explicit CLI flags as-is. + if n.mode.isSome(): + case n.mode.get() + of WakuMode.Core: + b.withRelay(true) + b.filterServiceConf.withEnabled(true) + b.withLightPush(true) + b.discv5Conf.withEnabled(true) + b.withPeerExchange(true) + b.withRendezvous(true) + b.rateLimitConf.withRateLimitsIfNotAssigned( + @["filter:100/1s", "lightpush:5/1s", "px:5/1s"] + ) + of WakuMode.Edge: + b.withPeerExchange(true) + b.withRelay(false) + b.filterServiceConf.withEnabled(false) + b.withLightPush(false) + b.storeServiceConf.withEnabled(false) return b.build()