Structured Broker API: interface/impl split + nim-brokers integration

Squash of the initial structured-API POC work (1d684bf8..c37dcf84):
- WIP: structured Broker API interface + impl (Logos Delivery facade)
- Restructured folders; interfaces under logos_delivery/api; MessagingClient
  and ReliableChannelManager reworked as BrokerInterface/BrokerImplement
- API types elevated under logos_delivery/api/types.nim
- onelogosdelivery target; wakunode2 / FFI lib build fixes
- nim-brokers integration + version bumps; git_version into FFI lib version
- compile fixes for tests
This commit is contained in:
NagyZoltanPeter 2026-06-19 08:06:11 +02:00
parent a73035e28d
commit 8456245714
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
45 changed files with 2037 additions and 467 deletions

View File

@ -0,0 +1,83 @@
---
name: gitnexus-cli
description: "Use when the user needs to run GitNexus CLI commands like analyze/index a repo, check status, clean the index, generate a wiki, or list indexed repos. Examples: \"Index this repo\", \"Reanalyze the codebase\", \"Generate a wiki\""
---
# GitNexus CLI Commands
All commands work via `npx` — no global install required.
## Commands
### analyze — Build or refresh the index
```bash
npx gitnexus analyze
```
Run from the project root. This parses all source files, builds the knowledge graph, writes it to `.gitnexus/`, and generates CLAUDE.md / AGENTS.md context files.
| Flag | Effect |
| -------------- | ---------------------------------------------------------------- |
| `--force` | Force full re-index even if up to date |
| `--embeddings` | Enable embedding generation for semantic search (off by default) |
| `--drop-embeddings` | Drop existing embeddings on rebuild. By default, an `analyze` without `--embeddings` preserves them. |
**When to run:** First time in a project, after major code changes, or when `gitnexus://repo/{name}/context` reports the index is stale. In Claude Code, a PostToolUse hook detects staleness after `git commit` and `git merge` and notifies the agent to run `analyze` — the hook does not run analyze itself, to avoid blocking the agent for up to 120s and risking KuzuDB corruption on timeout.
### status — Check index freshness
```bash
npx gitnexus status
```
Shows whether the current repo has a GitNexus index, when it was last updated, and symbol/relationship counts. Use this to check if re-indexing is needed.
### clean — Delete the index
```bash
npx gitnexus clean
```
Deletes the `.gitnexus/` directory and unregisters the repo from the global registry. Use before re-indexing if the index is corrupt or after removing GitNexus from a project.
| Flag | Effect |
| --------- | ------------------------------------------------- |
| `--force` | Skip confirmation prompt |
| `--all` | Clean all indexed repos, not just the current one |
### wiki — Generate documentation from the graph
```bash
npx gitnexus wiki
```
Generates repository documentation from the knowledge graph using an LLM. Requires an API key (saved to `~/.gitnexus/config.json` on first use).
| Flag | Effect |
| ------------------- | ----------------------------------------- |
| `--force` | Force full regeneration |
| `--model <model>` | LLM model (default: minimax/minimax-m2.5) |
| `--base-url <url>` | LLM API base URL |
| `--api-key <key>` | LLM API key |
| `--concurrency <n>` | Parallel LLM calls (default: 3) |
| `--gist` | Publish wiki as a public GitHub Gist |
### list — Show all indexed repos
```bash
npx gitnexus list
```
Lists all repositories registered in `~/.gitnexus/registry.json`. The MCP `list_repos` tool provides the same information.
## After Indexing
1. **Read `gitnexus://repo/{name}/context`** to verify the index loaded
2. Use the other GitNexus skills (`exploring`, `debugging`, `impact-analysis`, `refactoring`) for your task
## Troubleshooting
- **"Not inside a git repository"**: Run from a directory inside a git repo
- **Index is stale after re-analyzing**: Restart Claude Code to reload the MCP server
- **Embeddings slow**: Omit `--embeddings` (it's off by default) or set `OPENAI_API_KEY` for faster API-based embedding

View File

@ -0,0 +1,89 @@
---
name: gitnexus-debugging
description: "Use when the user is debugging a bug, tracing an error, or asking why something fails. Examples: \"Why is X failing?\", \"Where does this error come from?\", \"Trace this bug\""
---
# Debugging with GitNexus
## When to Use
- "Why is this function failing?"
- "Trace where this error comes from"
- "Who calls this method?"
- "This endpoint returns 500"
- Investigating bugs, errors, or unexpected behavior
## Workflow
```
1. gitnexus_query({query: "<error or symptom>"}) → Find related execution flows
2. gitnexus_context({name: "<suspect>"}) → See callers/callees/processes
3. READ gitnexus://repo/{name}/process/{name} → Trace execution flow
4. gitnexus_cypher({query: "MATCH path..."}) → Custom traces if needed
```
> If "Index is stale" → run `npx gitnexus analyze` in terminal.
## Checklist
```
- [ ] Understand the symptom (error message, unexpected behavior)
- [ ] gitnexus_query for error text or related code
- [ ] Identify the suspect function from returned processes
- [ ] gitnexus_context to see callers and callees
- [ ] Trace execution flow via process resource if applicable
- [ ] gitnexus_cypher for custom call chain traces if needed
- [ ] Read source files to confirm root cause
```
## Debugging Patterns
| Symptom | GitNexus Approach |
| -------------------- | ---------------------------------------------------------- |
| Error message | `gitnexus_query` for error text → `context` on throw sites |
| Wrong return value | `context` on the function → trace callees for data flow |
| Intermittent failure | `context` → look for external calls, async deps |
| Performance issue | `context` → find symbols with many callers (hot paths) |
| Recent regression | `detect_changes` to see what your changes affect |
## Tools
**gitnexus_query** — find code related to error:
```
gitnexus_query({query: "payment validation error"})
→ Processes: CheckoutFlow, ErrorHandling
→ Symbols: validatePayment, handlePaymentError, PaymentException
```
**gitnexus_context** — full context for a suspect:
```
gitnexus_context({name: "validatePayment"})
→ Incoming calls: processCheckout, webhookHandler
→ Outgoing calls: verifyCard, fetchRates (external API!)
→ Processes: CheckoutFlow (step 3/7)
```
**gitnexus_cypher** — custom call chain traces:
```cypher
MATCH path = (a)-[:CodeRelation {type: 'CALLS'}*1..2]->(b:Function {name: "validatePayment"})
RETURN [n IN nodes(path) | n.name] AS chain
```
## Example: "Payment endpoint returns 500 intermittently"
```
1. gitnexus_query({query: "payment error handling"})
→ Processes: CheckoutFlow, ErrorHandling
→ Symbols: validatePayment, handlePaymentError
2. gitnexus_context({name: "validatePayment"})
→ Outgoing calls: verifyCard, fetchRates (external API!)
3. READ gitnexus://repo/my-app/process/CheckoutFlow
→ Step 3: validatePayment → calls fetchRates (external)
4. Root cause: fetchRates calls external API without proper timeout
```

View File

@ -0,0 +1,78 @@
---
name: gitnexus-exploring
description: "Use when the user asks how code works, wants to understand architecture, trace execution flows, or explore unfamiliar parts of the codebase. Examples: \"How does X work?\", \"What calls this function?\", \"Show me the auth flow\""
---
# Exploring Codebases with GitNexus
## When to Use
- "How does authentication work?"
- "What's the project structure?"
- "Show me the main components"
- "Where is the database logic?"
- Understanding code you haven't seen before
## Workflow
```
1. READ gitnexus://repos → Discover indexed repos
2. READ gitnexus://repo/{name}/context → Codebase overview, check staleness
3. gitnexus_query({query: "<what you want to understand>"}) → Find related execution flows
4. gitnexus_context({name: "<symbol>"}) → Deep dive on specific symbol
5. READ gitnexus://repo/{name}/process/{name} → Trace full execution flow
```
> If step 2 says "Index is stale" → run `npx gitnexus analyze` in terminal.
## Checklist
```
- [ ] READ gitnexus://repo/{name}/context
- [ ] gitnexus_query for the concept you want to understand
- [ ] Review returned processes (execution flows)
- [ ] gitnexus_context on key symbols for callers/callees
- [ ] READ process resource for full execution traces
- [ ] Read source files for implementation details
```
## Resources
| Resource | What you get |
| --------------------------------------- | ------------------------------------------------------- |
| `gitnexus://repo/{name}/context` | Stats, staleness warning (~150 tokens) |
| `gitnexus://repo/{name}/clusters` | All functional areas with cohesion scores (~300 tokens) |
| `gitnexus://repo/{name}/cluster/{name}` | Area members with file paths (~500 tokens) |
| `gitnexus://repo/{name}/process/{name}` | Step-by-step execution trace (~200 tokens) |
## Tools
**gitnexus_query** — find execution flows related to a concept:
```
gitnexus_query({query: "payment processing"})
→ Processes: CheckoutFlow, RefundFlow, WebhookHandler
→ Symbols grouped by flow with file locations
```
**gitnexus_context** — 360-degree view of a symbol:
```
gitnexus_context({name: "validateUser"})
→ Incoming calls: loginHandler, apiMiddleware
→ Outgoing calls: checkToken, getUserById
→ Processes: LoginFlow (step 2/5), TokenRefresh (step 1/3)
```
## Example: "How does payment processing work?"
```
1. READ gitnexus://repo/my-app/context → 918 symbols, 45 processes
2. gitnexus_query({query: "payment processing"})
→ CheckoutFlow: processPayment → validateCard → chargeStripe
→ RefundFlow: initiateRefund → calculateRefund → processRefund
3. gitnexus_context({name: "processPayment"})
→ Incoming: checkoutHandler, webhookHandler
→ Outgoing: validateCard, chargeStripe, saveTransaction
4. Read src/payments/processor.ts for implementation details
```

View File

@ -0,0 +1,64 @@
---
name: gitnexus-guide
description: "Use when the user asks about GitNexus itself — available tools, how to query the knowledge graph, MCP resources, graph schema, or workflow reference. Examples: \"What GitNexus tools are available?\", \"How do I use GitNexus?\""
---
# GitNexus Guide
Quick reference for all GitNexus MCP tools, resources, and the knowledge graph schema.
## Always Start Here
For any task involving code understanding, debugging, impact analysis, or refactoring:
1. **Read `gitnexus://repo/{name}/context`** — codebase overview + check index freshness
2. **Match your task to a skill below** and **read that skill file**
3. **Follow the skill's workflow and checklist**
> If step 1 warns the index is stale, run `npx gitnexus analyze` in the terminal first.
## Skills
| Task | Skill to read |
| -------------------------------------------- | ------------------- |
| Understand architecture / "How does X work?" | `gitnexus-exploring` |
| Blast radius / "What breaks if I change X?" | `gitnexus-impact-analysis` |
| Trace bugs / "Why is X failing?" | `gitnexus-debugging` |
| Rename / extract / split / refactor | `gitnexus-refactoring` |
| Tools, resources, schema reference | `gitnexus-guide` (this file) |
| Index, status, clean, wiki CLI commands | `gitnexus-cli` |
## Tools Reference
| Tool | What it gives you |
| ---------------- | ------------------------------------------------------------------------ |
| `query` | Process-grouped code intelligence — execution flows related to a concept |
| `context` | 360-degree symbol view — categorized refs, processes it participates in |
| `impact` | Symbol blast radius — what breaks at depth 1/2/3 with confidence |
| `detect_changes` | Git-diff impact — what do your current changes affect |
| `rename` | Multi-file coordinated rename with confidence-tagged edits |
| `cypher` | Raw graph queries (read `gitnexus://repo/{name}/schema` first) |
| `list_repos` | Discover indexed repos |
## Resources Reference
Lightweight reads (~100-500 tokens) for navigation:
| Resource | Content |
| ---------------------------------------------- | ----------------------------------------- |
| `gitnexus://repo/{name}/context` | Stats, staleness check |
| `gitnexus://repo/{name}/clusters` | All functional areas with cohesion scores |
| `gitnexus://repo/{name}/cluster/{clusterName}` | Area members |
| `gitnexus://repo/{name}/processes` | All execution flows |
| `gitnexus://repo/{name}/process/{processName}` | Step-by-step trace |
| `gitnexus://repo/{name}/schema` | Graph schema for Cypher |
## Graph Schema
**Nodes:** File, Function, Class, Interface, Method, Community, Process
**Edges (via CodeRelation.type):** CALLS, IMPORTS, EXTENDS, IMPLEMENTS, DEFINES, MEMBER_OF, STEP_IN_PROCESS
```cypher
MATCH (caller)-[:CodeRelation {type: 'CALLS'}]->(f:Function {name: "myFunc"})
RETURN caller.name, caller.filePath
```

View File

@ -0,0 +1,97 @@
---
name: gitnexus-impact-analysis
description: "Use when the user wants to know what will break if they change something, or needs safety analysis before editing code. Examples: \"Is it safe to change X?\", \"What depends on this?\", \"What will break?\""
---
# Impact Analysis with GitNexus
## When to Use
- "Is it safe to change this function?"
- "What will break if I modify X?"
- "Show me the blast radius"
- "Who uses this code?"
- Before making non-trivial code changes
- Before committing — to understand what your changes affect
## Workflow
```
1. gitnexus_impact({target: "X", direction: "upstream"}) → What depends on this
2. READ gitnexus://repo/{name}/processes → Check affected execution flows
3. gitnexus_detect_changes() → Map current git changes to affected flows
4. Assess risk and report to user
```
> If "Index is stale" → run `npx gitnexus analyze` in terminal.
## Checklist
```
- [ ] gitnexus_impact({target, direction: "upstream"}) to find dependents
- [ ] Review d=1 items first (these WILL BREAK)
- [ ] Check high-confidence (>0.8) dependencies
- [ ] READ processes to check affected execution flows
- [ ] gitnexus_detect_changes() for pre-commit check
- [ ] Assess risk level and report to user
```
## Understanding Output
| Depth | Risk Level | Meaning |
| ----- | ---------------- | ------------------------ |
| d=1 | **WILL BREAK** | Direct callers/importers |
| d=2 | LIKELY AFFECTED | Indirect dependencies |
| d=3 | MAY NEED TESTING | Transitive effects |
## Risk Assessment
| Affected | Risk |
| ------------------------------ | -------- |
| <5 symbols, few processes | LOW |
| 5-15 symbols, 2-5 processes | MEDIUM |
| >15 symbols or many processes | HIGH |
| Critical path (auth, payments) | CRITICAL |
## Tools
**gitnexus_impact** — the primary tool for symbol blast radius:
```
gitnexus_impact({
target: "validateUser",
direction: "upstream",
minConfidence: 0.8,
maxDepth: 3
})
→ d=1 (WILL BREAK):
- loginHandler (src/auth/login.ts:42) [CALLS, 100%]
- apiMiddleware (src/api/middleware.ts:15) [CALLS, 100%]
→ d=2 (LIKELY AFFECTED):
- authRouter (src/routes/auth.ts:22) [CALLS, 95%]
```
**gitnexus_detect_changes** — git-diff based impact analysis:
```
gitnexus_detect_changes({scope: "staged"})
→ Changed: 5 symbols in 3 files
→ Affected: LoginFlow, TokenRefresh, APIMiddlewarePipeline
→ Risk: MEDIUM
```
## Example: "What breaks if I change validateUser?"
```
1. gitnexus_impact({target: "validateUser", direction: "upstream"})
→ d=1: loginHandler, apiMiddleware (WILL BREAK)
→ d=2: authRouter, sessionManager (LIKELY AFFECTED)
2. READ gitnexus://repo/my-app/processes
→ LoginFlow and TokenRefresh touch validateUser
3. Risk: 2 direct callers, 2 processes = MEDIUM
```

View File

@ -0,0 +1,121 @@
---
name: gitnexus-refactoring
description: "Use when the user wants to rename, extract, split, move, or restructure code safely. Examples: \"Rename this function\", \"Extract this into a module\", \"Refactor this class\", \"Move this to a separate file\""
---
# Refactoring with GitNexus
## When to Use
- "Rename this function safely"
- "Extract this into a module"
- "Split this service"
- "Move this to a new file"
- Any task involving renaming, extracting, splitting, or restructuring code
## Workflow
```
1. gitnexus_impact({target: "X", direction: "upstream"}) → Map all dependents
2. gitnexus_query({query: "X"}) → Find execution flows involving X
3. gitnexus_context({name: "X"}) → See all incoming/outgoing refs
4. Plan update order: interfaces → implementations → callers → tests
```
> If "Index is stale" → run `npx gitnexus analyze` in terminal.
## Checklists
### Rename Symbol
```
- [ ] gitnexus_rename({symbol_name: "oldName", new_name: "newName", dry_run: true}) — preview all edits
- [ ] Review graph edits (high confidence) and ast_search edits (review carefully)
- [ ] If satisfied: gitnexus_rename({..., dry_run: false}) — apply edits
- [ ] gitnexus_detect_changes() — verify only expected files changed
- [ ] Run tests for affected processes
```
### Extract Module
```
- [ ] gitnexus_context({name: target}) — see all incoming/outgoing refs
- [ ] gitnexus_impact({target, direction: "upstream"}) — find all external callers
- [ ] Define new module interface
- [ ] Extract code, update imports
- [ ] gitnexus_detect_changes() — verify affected scope
- [ ] Run tests for affected processes
```
### Split Function/Service
```
- [ ] gitnexus_context({name: target}) — understand all callees
- [ ] Group callees by responsibility
- [ ] gitnexus_impact({target, direction: "upstream"}) — map callers to update
- [ ] Create new functions/services
- [ ] Update callers
- [ ] gitnexus_detect_changes() — verify affected scope
- [ ] Run tests for affected processes
```
## Tools
**gitnexus_rename** — automated multi-file rename:
```
gitnexus_rename({symbol_name: "validateUser", new_name: "authenticateUser", dry_run: true})
→ 12 edits across 8 files
→ 10 graph edits (high confidence), 2 ast_search edits (review)
→ Changes: [{file_path, edits: [{line, old_text, new_text, confidence}]}]
```
**gitnexus_impact** — map all dependents first:
```
gitnexus_impact({target: "validateUser", direction: "upstream"})
→ d=1: loginHandler, apiMiddleware, testUtils
→ Affected Processes: LoginFlow, TokenRefresh
```
**gitnexus_detect_changes** — verify your changes after refactoring:
```
gitnexus_detect_changes({scope: "all"})
→ Changed: 8 files, 12 symbols
→ Affected processes: LoginFlow, TokenRefresh
→ Risk: MEDIUM
```
**gitnexus_cypher** — custom reference queries:
```cypher
MATCH (caller)-[:CodeRelation {type: 'CALLS'}]->(f:Function {name: "validateUser"})
RETURN caller.name, caller.filePath ORDER BY caller.filePath
```
## Risk Rules
| Risk Factor | Mitigation |
| ------------------- | ----------------------------------------- |
| Many callers (>5) | Use gitnexus_rename for automated updates |
| Cross-area refs | Use detect_changes after to verify scope |
| String/dynamic refs | gitnexus_query to find them |
| External/public API | Version and deprecate properly |
## Example: Rename `validateUser` to `authenticateUser`
```
1. gitnexus_rename({symbol_name: "validateUser", new_name: "authenticateUser", dry_run: true})
→ 12 edits: 10 graph (safe), 2 ast_search (review)
→ Files: validator.ts, login.ts, middleware.ts, config.json...
2. Review ast_search edits (config.json: dynamic reference!)
3. gitnexus_rename({symbol_name: "validateUser", new_name: "authenticateUser", dry_run: false})
→ Applied 12 edits across 8 files
4. gitnexus_detect_changes({scope: "all"})
→ Affected: LoginFlow, TokenRefresh
→ Risk: MEDIUM — run tests for these flows
```

View File

@ -513,10 +513,14 @@ Language: Nim 2.x | License: MIT or Apache 2.0
Note: For specific version requirements, check `logos_delivery.nimble`.
## Working with nim-brokers
@nim_brokers_instructions.md
<!-- gitnexus:start -->
# GitNexus — Code Intelligence
This project is indexed by GitNexus as **logos-delivery** (2076 symbols, 2564 relationships, 12 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
This project is indexed by GitNexus as **logos-delivery** (13906 symbols, 23819 relationships, 294 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.

View File

@ -1 +1,45 @@
@AGENTS.md
<!-- gitnexus:start -->
# GitNexus — Code Intelligence
This project is indexed by GitNexus as **logos-delivery** (13906 symbols, 23819 relationships, 294 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.
## Always Do
- **MUST run impact analysis before editing any symbol.** Before modifying a function, class, or method, run `gitnexus_impact({target: "symbolName", direction: "upstream"})` and report the blast radius (direct callers, affected processes, risk level) to the user.
- **MUST run `gitnexus_detect_changes()` before committing** to verify your changes only affect expected symbols and execution flows.
- **MUST warn the user** if impact analysis returns HIGH or CRITICAL risk before proceeding with edits.
- When exploring unfamiliar code, use `gitnexus_query({query: "concept"})` to find execution flows instead of grepping. It returns process-grouped results ranked by relevance.
- When you need full context on a specific symbol — callers, callees, which execution flows it participates in — use `gitnexus_context({name: "symbolName"})`.
## Never Do
- NEVER edit a function, class, or method without first running `gitnexus_impact` on it.
- NEVER ignore HIGH or CRITICAL risk warnings from impact analysis.
- NEVER rename symbols with find-and-replace — use `gitnexus_rename` which understands the call graph.
- NEVER commit changes without running `gitnexus_detect_changes()` to check affected scope.
## Resources
| Resource | Use for |
|----------|---------|
| `gitnexus://repo/logos-delivery/context` | Codebase overview, check index freshness |
| `gitnexus://repo/logos-delivery/clusters` | All functional areas |
| `gitnexus://repo/logos-delivery/processes` | All execution flows |
| `gitnexus://repo/logos-delivery/process/{name}` | Step-by-step execution trace |
## CLI
| Task | Read this skill file |
|------|---------------------|
| Understand architecture / "How does X work?" | `.claude/skills/gitnexus/gitnexus-exploring/SKILL.md` |
| Blast radius / "What breaks if I change X?" | `.claude/skills/gitnexus/gitnexus-impact-analysis/SKILL.md` |
| Trace bugs / "Why is X failing?" | `.claude/skills/gitnexus/gitnexus-debugging/SKILL.md` |
| Rename / extract / split / refactor | `.claude/skills/gitnexus/gitnexus-refactoring/SKILL.md` |
| Tools, resources, schema reference | `.claude/skills/gitnexus/gitnexus-guide/SKILL.md` |
| Index, status, clean, wiki CLI commands | `.claude/skills/gitnexus/gitnexus-cli/SKILL.md` |
<!-- gitnexus:end -->

View File

@ -416,7 +416,7 @@ docker-liteprotocoltester-push:
################
## C Bindings ##
################
.PHONY: cbindings cwaku_example liblogosdelivery liblogosdelivery_example
.PHONY: cbindings cwaku_example libwaku liblogosdelivery liblogosdelivery_example onelogosdelivery
detected_OS ?= Linux
ifeq ($(OS),Windows_NT)
@ -450,6 +450,12 @@ else
$(NIMBLE) --verbose liblogosdelivery$(BUILD_COMMAND) logos_delivery.nimble
endif
# New single-root Logos Delivery library (BrokerFfiApi + all wrappers), built from
# ./logos_delivery.nim. Single cross-platform task (no $(BUILD_COMMAND) suffix).
# Set SRCGEN=1 to also dump the broker-generated sources (-d:brokerDebug).
onelogosdelivery: | build-deps librln
$(NIMBLE) --verbose onelogosdelivery logos_delivery.nimble
logosdelivery_example: | build liblogosdelivery
@echo -e $(BUILD_MSG) "build/$@"
ifeq ($(detected_OS),Darwin)

View File

@ -67,11 +67,11 @@ when isMainModule:
if args.ethRpcEndpoint == "":
# Create a basic configuration for the Waku node
# No RLN as we don't have an ETH RPC Endpoint
conf.mode = Core
conf.mode = some(WakuMode.Core)
conf.preset = "logos.dev"
else:
# Connect to TWN, use ETH RPC Endpoint for RLN
conf.mode = Core
conf.mode = some(WakuMode.Core)
conf.preset = "twn"
conf.ethClientUrls = @[EthRpcUrl(args.ethRpcEndpoint)]

View File

@ -115,9 +115,9 @@ proc logosdelivery_start_node(
chronicles.error "MessageReceivedEvent.listen failed", err = $error
return err("MessageReceivedEvent.listen failed: " & $error)
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
let ConnectionStatusChangeListener = ConnectionStatusChangeEvent.listen(
ctx.myLib[].brokerCtx,
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
proc(event: ConnectionStatusChangeEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onConnectionStatusChange"):
$newJsonEvent("connection_status_change", event),
).valueOr:
@ -150,7 +150,7 @@ proc logosdelivery_stop_node(
await MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx)
await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
await MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx)
await ConnectionStatusChangeEvent.dropAllListeners(ctx.myLib[].brokerCtx)
(await ctx.myLib[].stop()).isOkOr:
let errMsg = $error

View File

@ -8,3 +8,28 @@ export api
import logos_delivery/waku/factory/waku
export waku
import logos_delivery/api/logos_delivery_interface
export logos_delivery_interface
import logos_delivery/logos_delivery
import brokers/api_library # registerBrokerLibrary
# `git_version` is exported as a `{.strdefine.}` by several modules in the graph
# (waku.nim, waku_node.nim, nim-ffi), so it's ambiguous unqualified. Pin to
# waku's and expose an unambiguous local const for registerBrokerLibrary; the
# build injects `-d:git_version="$(git describe …)"`.
const ldGitVersion = waku.git_version
registerBrokerLibrary:
name:
"logosdelivery"
version:
ldGitVersion
mainClass:
LogosDeliveryInterface
initializeRequest:
StartAsClient
shutdownRequest:
Shutdown

View File

@ -1,6 +1,10 @@
#!fmt: off
import os
import std/strutils
# strip() is used in buildLibDynamicMac/buildLibStaticMac (above the original
# `import std/strutils` lower in this file); needed at the top when this nimble
# is evaluated as the package script while compiling ./logos_delivery.nim.
mode = ScriptMode.Verbose
### Package
@ -65,7 +69,7 @@ requires "https://github.com/logos-messaging/nim-ffi#v0.1.3"
requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5"
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1"
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#cf5ee65cc20211068d7191de7e5e177c0dc212fa"
requires "https://github.com/vacp2p/nim-lsquic.git#v0.5.1"
requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2"
@ -504,6 +508,49 @@ task liblogosdeliveryStaticLinux, "Generate bindings":
task liblogosdeliveryStaticMac, "Generate bindings":
buildLibStaticMac("liblogosdelivery", "library")
## New single-root Logos Delivery library (BrokerFfiApi).
## Builds ./logos_delivery.nim (registerBrokerLibrary name "logosdelivery") as a
## shared library with the full set of FFI wrappers (C + C++ headers always, plus
## Python, Rust and Go). Set the SRCGEN env var to also dump the broker-generated
## sources (-d:brokerDebug). Independent of the original liblogosdelivery targets.
task onelogosdelivery,
"Build the new single-root liblogosdelivery as a BrokerFfiApi library with all FFI wrappers (C/C++/Python/Rust/Go)":
let outdir = "build/onelogosdelivery"
if not dirExists(outdir):
mkDir(outdir)
# SRCGEN env var present -> persist the generated *.gen.nim under build/broker_debug/.
let srcGenFlag =
if existsEnv("SRCGEN"): " -d:brokerDebug" else: ""
let libExt =
when defined(windows): ".dll"
elif defined(macosx): ".dylib"
else: ".so"
# Full --out path keeps the shared lib in outdir (a bare basename lands in cwd).
exec "nim c" &
" -d:BrokerFfiApi" &
" -d:BrokerFfiApiGenPy -d:BrokerFfiApiGenRust -d:BrokerFfiApiGenGo" &
" --threads:on --app:lib --opt:speed --mm:refc" &
" -d:metrics -d:discv5_protocol_id=d5waku" &
" --nimMainPrefix:logosdelivery" & " --out:" & outdir & "/liblogosdelivery" & libExt &
srcGenFlag & " " & getMyCPU() & getNimParams() & " ./logos_delivery.nim"
# The generated FFI wrappers land next to the source (repo root): brokers v3.1.2's
# `-d:BrokerFfiApiOutDir` is referenced but never declared `{.strdefine.}`, so it
# cannot redirect them. Move them into outdir as a post-build step to keep the
# repo root clean.
for f in [
"logosdelivery.h", "logosdelivery.hpp", "logosdelivery.py", "logosdelivery.cddl",
"logosdeliveryConfig.cmake", "logosdeliveryConfigVersion.cmake",
]:
if fileExists(f):
mvFile(f, outdir & "/" & f)
for d in ["logosdelivery_rs", "logosdelivery_go"]:
if dirExists(d):
mvDir(d, outdir & "/" & d)
### Formatting tasks
task nphchanges, "Run nph on .nim/.nims/.nimble files changed on this branch/PR":

View File

@ -0,0 +1,182 @@
## KernelInterface — the libwaku operation set, exposed with the real Nim typed
## signatures behind each entry point (NOT the flat string/JSON FFI contract).
##
## WakuMessage / PubsubTopic / ContentTopic / StoreQueryRequest+Response cross
## the broker natively. libp2p identity / ENR values cross as their canonical
## string forms (they are ref/opaque and cannot be auto-serialised). Inbound
## relay/filter delivery is the `ReceivedMessage` event (replaces libwaku's
## set_event_callback for messages).
##
## Broker constraints (v3.1.0): every request returns `Result[T, string]` — the
## error channel is always `string`; the payload `T` must NOT be `void` (the mt
## codec cannot encode `void`), so "no value" requests return `Result[bool, string]`.
import std/options
import results, chronos
import brokers/broker_interface
import logos_delivery/api/types
import
logos_delivery/waku/waku_core, # PubsubTopic
logos_delivery/waku/waku_store/common # StoreQueryRequest, StoreQueryResponse
export types
BrokerInterface(API, KernelInterface):
EventBroker:
type ReceivedMessage = object
## Inbound relay/filter message delivery (replaces set_event_callback).
pubsubTopic*: PubsubTopic
message*: WakuMessage
# --- topic construction ---
RequestBroker:
proc buildContentTopic(
appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async.}
RequestBroker:
proc buildPubsubTopic(
topicName: string
): Future[Result[PubsubTopic, string]] {.async.}
RequestBroker:
proc defaultPubsubTopic(): Future[Result[PubsubTopic, string]] {.async.}
# --- relay ---
RequestBroker:
proc relayPublish(
pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[int, string]] {.async.}
RequestBroker:
proc relaySubscribe(
pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc relayUnsubscribe(
pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc relayAddProtectedShard(
clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc relayConnectedPeers(
pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.}
RequestBroker:
proc relayPeersInMesh(
pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.}
# --- filter ---
RequestBroker:
proc filterSubscribe(
pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic], peer: string
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc filterUnsubscribe(
pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic], peer: string
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc filterUnsubscribeAll(peer: string): Future[Result[bool, string]] {.async.}
# --- lightpush ---
RequestBroker:
proc lightpushPublish(
pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
): Future[Result[string, string]] {.async.}
# --- store ---
RequestBroker:
proc storeQuery(
request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async.}
# --- peer management ---
RequestBroker:
proc connect(
peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc disconnectPeerById(peerId: string): Future[Result[bool, string]] {.async.}
RequestBroker:
proc disconnectAllPeers(): Future[Result[bool, string]] {.async.}
RequestBroker:
proc dialPeer(
peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc dialPeerById(
peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc peerIdsFromPeerstore(): Future[Result[seq[string], string]] {.async.}
RequestBroker:
proc connectedPeersInfo(): Future[Result[seq[string], string]] {.async.}
RequestBroker:
proc connectedPeers(): Future[Result[seq[string], string]] {.async.}
RequestBroker:
proc peerIdsByProtocol(
protocol: string
): Future[Result[seq[string], string]] {.async.}
# --- discovery ---
RequestBroker:
proc dnsDiscovery(
enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async.}
RequestBroker:
proc discv5UpdateBootnodes(
bootnodes: seq[string]
): Future[Result[bool, string]] {.async.}
RequestBroker:
proc startDiscv5(): Future[Result[bool, string]] {.async.}
RequestBroker:
proc stopDiscv5(): Future[Result[bool, string]] {.async.}
RequestBroker:
proc peerExchangeRequest(numPeers: uint64): Future[Result[int, string]] {.async.}
# --- debug / info ---
RequestBroker:
proc version(): Future[Result[string, string]] {.async.}
RequestBroker:
proc listenAddresses(): Future[Result[seq[string], string]] {.async.}
RequestBroker:
proc myEnr(): Future[Result[string, string]] {.async.}
RequestBroker:
proc myPeerId(): Future[Result[string, string]] {.async.}
RequestBroker:
proc metrics(): Future[Result[string, string]] {.async.}
RequestBroker:
proc isOnline(): Future[Result[bool, string]] {.async.}
RequestBroker:
proc pingPeer(
peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async.}

View File

@ -0,0 +1,51 @@
## LogosDeliveryInterface — the facade / mainClass interface.
##
## Owns the node and the three sub-interfaces; getters return them. This module
## imports AND re-exports the three sub-interface contracts so a consumer can
## `import ./LogosDeliveryInterface` and get the whole interface surface.
import results, chronos
import brokers/broker_interface
# Module aliases avoid the module-name vs type-name clash (file KernelInterface.nim
# exports type KernelInterface); the type names stay in scope unqualified for the getters.
import ./kernel_interface as ikernel_iface
import ./messaging_client_interface as imessagingclient_iface
import ./reliable_channel_manager_interface as ireliablechannelmanager_iface
export ikernel_iface, imessagingclient_iface, ireliablechannelmanager_iface
BrokerInterface(API, LogosDeliveryInterface):
EventBroker:
type ConnectionStatusChangeEvent* = object
connectionStatus*: ConnectionStatus
RequestBroker:
proc startAsNode(config: string): Future[Result[void, string]] {.async.}
RequestBroker:
proc startAsClient(
mode: WakuMode, preset: string
): Future[Result[MessagingClientInterface, string]] {.async.}
RequestBroker:
proc shutdown(): Future[Result[void, string]] {.async.}
RequestBroker:
proc stop(): Future[Result[void, string]] {.async.}
RequestBroker:
# Getters: return the owned sub-interface instances.
proc kernel(): Future[Result[KernelInterface, string]] {.async.}
RequestBroker:
proc messaging(): Future[Result[MessagingClientInterface, string]] {.async.}
RequestBroker:
proc channels(): Future[Result[ReliableChannelManagerInterface, string]] {.async.}
RequestBroker:
proc getNodeInfo(id: NodeInfoId): Future[Result[string, string]] {.async.}
RequestBroker:
proc getAvailableConfigs(): Future[Result[string, string]] {.async.}

View File

@ -0,0 +1,48 @@
## MessagingClientInterface — the messaging API (waku/api) minus createNode.
##
## Node creation lives in the facade (LogosDeliveryInterface); this interface exposes
## subscribe / unsubscribe / send only.
import results, chronos
import brokers/broker_interface
import logos_delivery/api/types
export types
BrokerInterface(API, MessagingClientInterface):
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
RequestBroker:
proc subscribe(contentTopic: ContentTopic): Future[Result[void, string]] {.async.}
RequestBroker:
proc unsubscribe(contentTopic: ContentTopic): Future[Result[void, string]] {.async.}
RequestBroker:
# Returns the RequestId in its string form. Named `sendMessage` (not `send`)
# because broker request verbs must be globally unique across all interfaces
# in the library, and ReliableChannelManagerInterface also has a send.
proc send(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.}

View File

@ -0,0 +1,48 @@
## ReliableChannelManagerInterface — create / close / send on a reliable channel,
## plus a MessageReceived event (bridged from the channel layer's own
## ChannelMessageReceivedEvent by the impl).
import results, chronos
import brokers/broker_interface
import logos_delivery/api/types
export types
BrokerInterface(API, ReliableChannelManagerInterface):
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string
RequestBroker:
# Returns the channel id of the created channel.
proc createReliableChannel(
channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID
): Future[Result[ChannelId, string]] {.async.}
RequestBroker:
proc closeChannel(channelId: ChannelId): Future[Result[void, string]] {.async.}
RequestBroker:
# Returns the RequestId in its string form. Named `sendOnChannel` (not `send`)
# for the global-verb-uniqueness reason noted on MessagingClientInterface.sendMessage.
proc sendOnChannel(
channelId: ChannelId, payload: seq[byte], ephemeral: bool
): Future[Result[RequestId, string]] {.async.}

View File

@ -0,0 +1,129 @@
## logos_delivery/api/types — shared API type definitions.
##
## These derived (non-builtin) types are referenced by the BrokerInterface
## contracts (LogosDeliveryInterface / MessagingClientInterface /
## ReliableChannelManagerInterface) and were elevated here from their original
## modules so the API surface has a single types home. Each original module now
## imports this module and re-exports the moved entity, so existing call sites
## are unaffected.
##
## NOTE (accepted layering inversion): `WakuMessage` and `ContentTopic` were
## physically moved out of `waku_core/`, so `waku_core/message` now depends on
## this module (which pulls in nim-sds via the channel id types).
{.push raises: [].}
import bearssl/rand, std/times, chronos
import stew/byteutils
import std/hashes
import
logos_delivery/waku/utils/requests as request_utils
# generateRequestId (used by RequestId.new); lost when the procs were
# consolidated here during the type-elevation refactor.
import
logos_delivery/waku/waku_core/time
# Timestamp: TODO: this needs to be elevated into interface level.
import types/sds_message_id # nim-sds: SdsChannelID, SdsParticipantID (leaf)
# SdsParticipantID (and SdsChannelID, needed by ChannelId) are NOT moved — they
# belong to nim-sds; they are imported and explicitly re-exported here.
export sds_message_id
type
ContentTopic* = string
RequestId* = distinct string
ConnectionStatus* {.pure.} = enum
Disconnected
PartiallyConnected
Connected
WakuMode* {.pure.} = enum
Core # full service node
Edge # client-only node
NodeInfoId* {.pure.} = enum
Version
Metrics
MyMultiaddresses
MyENR
MyPeerId
MyBoundPorts
MyMixPubKey
ChannelId* = SdsChannelID
MessageEnvelope* = object
contentTopic*: ContentTopic
payload*: seq[byte]
ephemeral*: bool
meta*: seq[byte]
## Opaque wire-format marker carried on the underlying WakuMessage.
## Higher layers (e.g. Reliable Channel) stamp this so peers can route
## ingress traffic to their corresponding layer. Empty by default.
WakuMessage* = object # Data payload transmitted.
payload*: seq[byte]
contentTopic*: ContentTopic ## content-based filtering identifier
meta*: seq[byte] ## application specific metadata
version*: uint32 ## payload-encryption discriminator (Whisper/WakuV1 compat)
timestamp*: Timestamp ## sender generated timestamp
ephemeral*: bool ## transient (not-to-be-stored) marker
proof*: seq[byte] ## RFC 17 spam-protection proof (rln-relay)
## ===== helpers =====
##
proc new*(T: typedesc[RequestId], rng: ref HmacDrbgContext): T =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
proc `$`*(r: RequestId): string {.inline.} =
string(r)
proc `==`*(a, b: RequestId): bool {.inline.} =
string(a) == string(b)
proc hash*(r: RequestId): Hash =
## Allows `RequestId` to be used as a `Table` key.
hash(string(r))
proc generateRequestId*(rng: ref HmacDrbgContext): RequestId =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return RequestId(byteutils.toHex(bytes))
proc init*(
T: type MessageEnvelope,
contentTopic: ContentTopic,
payload: seq[byte] | string,
ephemeral: bool = false,
meta: seq[byte] = @[],
): MessageEnvelope =
when payload is seq[byte]:
MessageEnvelope(
contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta
)
else:
MessageEnvelope(
contentTopic: contentTopic,
payload: payload.toBytes(),
ephemeral: ephemeral,
meta: meta,
)
proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
## Convert a MessageEnvelope to a WakuMessage.
var wm = WakuMessage(
contentTopic: envelope.contentTopic,
payload: envelope.payload,
ephemeral: envelope.ephemeral,
meta: envelope.meta,
timestamp: getNowInNanosecondTime(),
)
return wm
{.pop.}

View File

@ -13,27 +13,7 @@ import logos_delivery/waku/events/message_events as waku_message_events
import brokers/event_broker
import ./types as channel_types
import logos_delivery/api/reliable_channel_manager_interface
export waku_message_events, channel_types, event_broker
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string
export
waku_message_events, channel_types, event_broker, reliable_channel_manager_interface

View File

@ -21,8 +21,8 @@ import bearssl/rand
import stew/byteutils
import libp2p/crypto/crypto as libp2p_crypto
import logos_delivery/api/messaging_client_interface as mci
import logos_delivery/waku/api/types
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/waku_core/topics
import ./events
@ -31,9 +31,7 @@ import ./scalable_data_sync/scalable_data_sync
import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption
export
types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager,
encryption
export types, events, segmentation, scalable_data_sync, rate_limit_manager, encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## Wire-format spec marker for the Reliable Channel layer, as defined
@ -44,14 +42,6 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## on breaking on-the-wire changes; implementations pin one version.
type
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
MessagePersistence {.pure.} = enum
Persistent
Ephemeral
@ -90,7 +80,7 @@ type
## Spec-defined public type. Fields are private so callers cannot
## mutate internals and break invariants. Getters are added below
## for the few values consumers may need.
sendHandler: SendHandler
messagingClient: MessagingClientInterface
channelId: ChannelId
contentTopic: ContentTopic
senderId: SdsParticipantID
@ -129,13 +119,15 @@ proc stop*(self: ReliableChannel) {.async: (raises: []).} =
## Stops the SDS background loops. Persisted SDS state survives.
await self.sdsHandler.stop()
proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) =
proc tryFinalizeChannelReq(
self: ReliableChannel, channelReqId: RequestId
) {.async: (raises: []).} =
## Tries to finalize the channel-level request identified by `channelReqId` if
## certain conditions are met, i.e., no segments are still awaiting dispatch or in flight,
## and the total number of confirmed + failed segments equals the total expected segments.
## Therefore, the channel-level request is removed from `self.channelReqs`
## and the appropriate final event is emitted.
##
##
let state = self.channelReqs.getOrDefault(channelReqId)
if state.totalExpectedSegments == 0:
## Either already finalized (and removed) or never inserted.
@ -148,18 +140,15 @@ proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) =
self.channelReqs.del(channelReqId)
if state.failedCount > 0:
ChannelMessageErrorEvent.emit(
await ChannelMessageErrorEvent.emit(
self.brokerCtx,
ChannelMessageErrorEvent(
channelId: self.channelId,
requestId: channelReqId,
error: "one or more segments failed",
),
channelId = self.channelId,
requestId = channelReqId,
error = "one or more segments failed",
)
else:
ChannelMessageSentEvent.emit(
self.brokerCtx,
ChannelMessageSentEvent(channelId: self.channelId, requestId: channelReqId),
await ChannelMessageSentEvent.emit(
self.brokerCtx, channelId = self.channelId, requestId = channelReqId
)
type ClaimedSegment = object
@ -184,7 +173,7 @@ type MessagingOutcome {.pure.} = enum
proc onMessageFinal(
self: ReliableChannel, messagingReqId: RequestId, outcome: MessagingOutcome
) =
) {.async.} =
for channelReqId, state in self.channelReqs.mpairs:
let idx = state.inflightMessagingIds.find(messagingReqId)
if idx < 0:
@ -195,17 +184,19 @@ proc onMessageFinal(
state.confirmedCount.inc()
of MessagingOutcome.Failed:
state.failedCount.inc()
self.tryFinalizeChannelReq(channelReqId)
await self.tryFinalizeChannelReq(channelReqId)
return
proc markSegmentFailed(self: ReliableChannel, channelReqId: RequestId) =
proc markSegmentFailed(
self: ReliableChannel, channelReqId: RequestId
) {.async: (raises: []).} =
try:
self.channelReqs[channelReqId].failedCount.inc()
except KeyError as e:
error "unreachable: channelReqId not found in markSegmentFailed",
channelReqId = $channelReqId, error = e.msg
return
self.tryFinalizeChannelReq(channelReqId)
await self.tryFinalizeChannelReq(channelReqId)
proc markSegmentInflight(
self: ReliableChannel, channelReqId: RequestId, messagingReqId: RequestId
@ -248,14 +239,15 @@ proc onReadyToSend(
## clear and encrypt only the application payload.
let encRes = await Encrypt.request(m)
let encrypted = encRes.valueOr:
MessageErrorEvent.emit(
### TODO: Emitting of events from another layer is not completly ok to do so.
await mci.MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: channelReqId, messageHash: "", error: "encryption failed: " & error
),
requestId = channelReqId,
messageHash = "",
error = "encryption failed: " & error,
)
## Encryption failed *before* we could hand the segment to the
self.markSegmentFailed(channelReqId)
await self.markSegmentFailed(channelReqId)
continue
let wireBytes = seq[byte](encrypted)
@ -269,25 +261,24 @@ proc onReadyToSend(
meta: LipWireReliableChannelVersion.toBytes(),
)
## `sendHandler` is not annotated `(raises: [])`, but this listener is.
## `messagingClient.send` is not annotated `(raises: [])`, but this listener is.
## Convert any raise to a Result error so the state machine handles
## both failure modes (Result.err and exception) through one path.
let sendRes =
try:
await self.sendHandler(envelope)
await self.messagingClient.send(envelope)
except CatchableError as e:
Result[RequestId, string].err("messaging send raised: " & e.msg)
let messagingReqId = sendRes.valueOr:
MessageErrorEvent.emit(
### TODO: Emitting of events from another layer is not completly ok to do so.
await mci.MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: channelReqId,
messageHash: "",
error: "messaging send failed: " & error,
),
requestId = channelReqId,
messageHash = "",
error = "messaging send failed: " & error,
)
self.markSegmentFailed(channelReqId)
await self.markSegmentFailed(channelReqId)
continue
self.markSegmentInflight(channelReqId, messagingReqId)
@ -390,13 +381,12 @@ proc onMessageReceived(
## the `Decrypt` request broker.
let decRes = await Decrypt.request(payload)
let plaintext = decRes.valueOr:
MessageErrorEvent.emit(
### TODO: Emitting of events from another layer is not completly ok to do so.
await mci.MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: RequestId(""),
messageHash: messageHash,
error: "decryption failed: " & error,
),
requestId = RequestId(""),
messageHash = messageHash,
error = "decryption failed: " & error,
)
return
let plaintextBytes = seq[byte](plaintext)
@ -407,13 +397,11 @@ proc onMessageReceived(
## marker/contentTopic filter already ran, so surface it as an error event
## rather than dropping it silently.
let deliverable = (await self.sdsHandler.handleIncoming(plaintextBytes)).valueOr:
MessageErrorEvent.emit(
await mci.MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: RequestId(""),
messageHash: messageHash,
error: "SDS handleIncoming failed: " & error,
),
requestId = RequestId(""),
messageHash = messageHash,
error = "SDS handleIncoming failed: " & error,
)
return
for content in deliverable:
@ -421,7 +409,7 @@ proc onMessageReceived(
proc new*(
T: type ReliableChannel,
sendHandler: SendHandler,
messagingClient: MessagingClientInterface,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
@ -441,7 +429,7 @@ proc new*(
## typically constructs it as a closure over `MessagingClient.send`. Tests
## pass a fake to drive the send state machine without touching the network.
let chn = T(
sendHandler: sendHandler,
messagingClient: messagingClient,
channelId: channelId,
contentTopic: contentTopic,
senderId: senderId,
@ -495,9 +483,9 @@ proc new*(
chn.onMessageFinal(evt.requestId, MessagingOutcome.Sent),
)
discard MessageErrorEvent.listen(
discard mci.MessageErrorEvent.listen(
chn.brokerCtx,
proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} =
proc(evt: mci.MessageErrorEvent): Future[void] {.async: (raises: []).} =
chn.onMessageFinal(evt.requestId, MessagingOutcome.Failed),
)

View File

@ -11,10 +11,11 @@ import chronos
import chronicles
import stew/byteutils
import brokers/broker_context
import brokers/broker_implement
import logos_delivery/api/messaging_client_interface
import logos_delivery/api/reliable_channel_manager_interface
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency
@ -27,33 +28,13 @@ const SdsJobId = "sds"
## One persistency job shared by every channel's SDS state; rows are
## keyed by channelId.
type ReliableChannelManager* = ref object
type ReliableChannelManager* = ref object of ReliableChannelManagerInterface
channels: Table[ChannelId, ReliableChannel]
messagingClient: MessagingClient ## Borrowed from the owning `Waku`.
sendHandler: SendHandler
messagingClient: MessagingClientInterface
## Borrowed from the owning `Waku`.
## Default egress dispatch for channels created through this manager.
## Constructed at mount time as a closure over `MessagingClient.send`
## so the channel layer itself stays callable-only.
brokerCtx: BrokerContext
proc new*(
T: type ReliableChannelManager,
messagingClient: MessagingClient,
sendHandler: SendHandler,
brokerCtx: BrokerContext = globalBrokerContext(),
): Result[T, string] =
if messagingClient.isNil():
return err("messaging client is required")
if sendHandler.isNil():
return err("sendHandler is required")
return ok(
T(
channels: initTable[ChannelId, ReliableChannel](),
messagingClient: messagingClient,
sendHandler: sendHandler,
brokerCtx: brokerCtx,
)
)
proc start*(self: ReliableChannelManager): Result[void, string] =
## Placeholder: per-channel listeners are installed in `ReliableChannel.new`,
@ -80,83 +61,86 @@ proc sdsPersistence(): Option[Persistence] =
return none(Persistence)
return some(newSdsPersistence(job))
proc createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
## Spec entry point. The `sendHandler` and `rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## request brokers — the application installs its own providers
## (or `setNoopEncryption()`) before traffic flows.
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
##
## `sendHandler` defaults to the manager's default (constructed at mount
## from `MessagingClient.send`); tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
BrokerImplement ReliableChannelManager of ReliableChannelManagerInterface:
proc new(
T: typedesc[ReliableChannelManager], messagingClient: MessagingClientInterface
): T =
T(
channels: initTable[ChannelId, ReliableChannel](),
messagingClient: messagingClient,
)
let segConfig = SegmentationConfig(
segmentSizeBytes: DefaultSegmentSizeBytes,
enableReedSolomon: false,
persistence: nil,
)
let sdsConfig = SdsConfig(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: sdsPersistence(),
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
method createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
): Future[Result[ChannelId, string]] {.async.} =
## Spec entry point. The`rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## request brokers — the application installs its own providers
## (or `setNoopEncryption()`) before traffic flows.
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
let segConfig = SegmentationConfig(
segmentSizeBytes: DefaultSegmentSizeBytes,
enableReedSolomon: false,
persistence: nil,
)
let sdsConfig = SdsConfig(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: sdsPersistence(),
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
let chn = ReliableChannel.new(
sendHandler = effectiveSendHandler,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
segConfig = segConfig,
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
)
let chn = ReliableChannel.new(
messagingClient = self.messagingClient,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
segConfig = segConfig,
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
)
self.channels[channelId] = chn
return ok(channelId)
self.channels[channelId] = chn
return ok(channelId)
proc closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []).} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
## state survives, so re-creating the channel restores it.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
self.channels.del(channelId)
await chn.stop()
return ok()
method closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async.} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
## state survives, so re-creating the channel restores it.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
self.channels.del(channelId)
await chn.stop()
return ok()
proc send*(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return await chn.send(appPayload, ephemeral)
method sendOnChannel(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async.} =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return chn.send(appPayload, ephemeral)
## Inbound messages are not handed to the manager by direct call. Each
## `ReliableChannel` installs its own `MessageReceivedEvent` listener

View File

@ -1,15 +1,8 @@
## Core identifier types for the Reliable Channel API.
import std/hashes
import logos_delivery/waku/api/types as api_types
import logos_delivery/api/types as api_types
import ./scalable_data_sync/scalable_data_sync
export scalable_data_sync
export api_types
type ChannelId* = SdsChannelID
proc hash*(r: RequestId): Hash =
## Allows `RequestId` to be used as a `Table` key.
hash(string(r))

View File

@ -0,0 +1,215 @@
## LogosDelivery — the LogosDeliveryInterface facade implementation.
##
## Owns a `Waku` (node + messagingClient + reliableChannelManager) and exposes
## the three sub-interfaces through cached getters. Mirrors the persistence
## example's `PersistenceImpl` (nim-brokers/examples/persistence/nimlib): a
## `ref object of <iface>` with `BrokerImplement`, sub-instances built on a
## `newInstanceCtx(self.brokerCtx)`, plus a `provideFactory`.
##
## Scope (decided): `initializeRequest(configPath)` creates + mounts the node but
## does NOT start it. `kernel()` is a stub until a KernelImpl exists.
import results, chronos
import std/options # some()/Option for WakuNodeConf.mode
import std/json # newJArray/newJObject/%*/pretty in getAvailableConfigs
import brokers/broker_context, brokers/broker_interface, brokers/broker_implement
import logos_delivery/api/logos_delivery_interface as logosdelivery_iface
import logos_delivery/api/types as api_types
import logos_delivery/api/kernel_interface as kernel_iface
import
logos_delivery/waku/factory/waku,
logos_delivery/waku/factory/waku_state_info,
logos_delivery/messaging/messaging_client,
logos_delivery/channels/reliable_channel_manager
# Waku, getNodeInfoItem, mountMessagingClient, mountReliableChannelManager, stop
import tools/confutils/[cli_args, config_option_meta] # WakuNodeConf (+ .load)
type LogosDelivery* = ref object of LogosDeliveryInterface
kernelI: KernelInterface ## lazily-built sub-interface caches (keep them reachable)
waku: Waku ## the owned node facade (built in initializeRequest)
messagingClient: MessagingClient
reliableChannelManager: ReliableChannelManager
proc loadConf(configPath: string): Result[WakuNodeConf, string] =
## Delegates to cli_args' concrete loader so confutils' `load` macro expands in
## cli_args' full scope (avoids leaking undeclared identifiers / WakuMode clash here).
loadWakuNodeConfFromFile(configPath)
proc createNode(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} =
let wakuConf = conf.toWakuConf().valueOr:
return err("Failed to handle the configuration: " & error)
## We are not defining app callbacks at node creation
let wakuRes = (await Waku.new(wakuConf)).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)
return ok(wakuRes)
proc initMessagingClient(self: LogosDelivery): Result[MessagingClient, string] =
let subCtx = newInstanceCtx(self.brokerCtx)
return ok(
MessagingClient.createUnderContext(
subCtx, self.waku.conf.p2pReliability, self.waku.node
)
)
proc initReliableChannelManager(
self: LogosDelivery
): Result[ReliableChannelManager, string] =
let subCtx = newInstanceCtx(self.brokerCtx)
return ok(ReliableChannelManager.createUnderContext(subCtx, self.messagingClient))
BrokerImplement LogosDelivery of LogosDeliveryInterface:
method kernel(
self: LogosDelivery
): Future[Result[KernelInterface, string]] {.async.} =
if self.waku.isNil():
return err("not initialized; call startAsClient first")
return err("kernel not yet implemented")
method messaging(
self: LogosDelivery
): Future[Result[MessagingClientInterface, string]] {.async.} =
if self.waku.isNil():
return err("not initialized; call startAsClient first")
if self.messagingClient.isNil():
return err("messaging client not mounted")
return ok(MessagingClientInterface(self.messagingClient))
method channels(
self: LogosDelivery
): Future[Result[ReliableChannelManagerInterface, string]] {.async.} =
if self.waku.isNil():
return err("not initialized; call startAsClient first")
if self.messagingClient.isNil():
return err("not initialized; call startAsClient first")
if self.reliableChannelManager.isNil():
self.reliableChannelManager = initReliableChannelManager(self).valueOr:
return err("failed to initialize ReliableChannelManager: " & error)
self.reliableChannelManager.start().isOkOr:
return err("failed to start ReliableChannelManager: " & error)
return ok(ReliableChannelManagerInterface(self.reliableChannelManager))
method startAsNode(
self: LogosDelivery, config: string
): Future[Result[void, string]] {.async.} =
if not self.waku.isNil():
return err("already initialized")
let conf = loadConf(config).valueOr:
return err("failed to load config: " & error)
# The restamp forces {.async: (raises: []).}, but createNode can raise.
try:
self.waku = (await createNode(conf)).valueOr:
return err("failed to create node: " & error)
(await self.waku.start()).isOkOr:
return err("failed to start node: " & $error)
return ok()
except CatchableError as e:
return err("initialize failed: " & e.msg)
method startAsClient(
self: LogosDelivery, mode: api_types.WakuMode, preset: string
): Future[Result[MessagingClientInterface, string]] {.async.} =
if not self.messagingClient.isNil():
return err("already initialized")
if not self.waku.isNil():
return err(
"already started as node; cannot start as client, but you can use as client"
)
try:
var conf: WakuNodeConf = ?defaultWakuNodeConf()
conf.mode = some(mode)
conf.preset = preset
self.waku = (await createNode(conf)).valueOr:
return err("failed to create node: " & error)
self.messagingClient = initMessagingClient(self).valueOr:
return err("failed to mount messaging client: " & error)
(await self.waku.start()).isOkOr:
return err("failed to start node: " & $error)
self.messagingClient.start().isOkOr:
return err("failed to start messaging client: " & $error)
return ok(MessagingClientInterface(self.messagingClient))
except CatchableError as e:
return err("initialize failed: " & e.msg)
method stop(self: LogosDelivery): Future[Result[void, string]] {.async.} =
if not self.reliableChannelManager.isNil():
try:
await self.reliableChannelManager.stop()
self.reliableChannelManager.close()
self.reliableChannelManager = nil
except CatchableError as e:
return err("ReliableChannelManager stop failed: " & e.msg)
if not self.messagingClient.isNil():
try:
await self.messagingClient.stop()
self.messagingClient.close()
self.messagingClient = nil
except CatchableError as e:
return err("MessagingClient stop failed: " & e.msg)
if not self.waku.isNil():
try:
(await self.waku.stop()).isOkOr:
return err("Node stop failed: " & $error)
# `Waku` is a plain ref object (not a BrokerImplement) — `stop()` is its
# only teardown; there is no `close()`.
self.waku = nil
except CatchableError as e:
return err("Node stop failed: " & e.msg)
return ok()
method shutdown(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return await self.stop()
method getNodeInfo(
self: LogosDelivery, id: NodeInfoId
): Future[Result[string, string]] {.async.} =
return ok(self.waku.stateInfo.getNodeInfoItem(id))
method getAvailableConfigs(
self: LogosDelivery
): Future[Result[string, string]] {.async.} =
let optionMetas: seq[ConfigOptionMeta] = extractConfigOptionMeta(WakuNodeConf)
var configOptionDetails = newJArray()
for meta in optionMetas:
configOptionDetails.add(
%*{
meta.fieldName: meta.typeName & "(" & meta.defaultValue & ")",
"desc": meta.desc,
}
)
var jsonNode = newJObject()
jsonNode["configOptions"] = configOptionDetails
let asString = pretty(jsonNode)
return ok(pretty(jsonNode))
# FFI factory registration
proc setupProviders(ctx: BrokerContext): Result[void, string] =
## Called by registerBrokerLibrary on the processing thread: construct the
## main facade impl adopting the FFI context, wiring its providers under `ctx`.
discard LogosDelivery.createUnderContext(ctx)
ok()
# DI factory registration: non ffi - nim lib users needs it.
LogosDeliveryInterface.provideFactory(
proc(): Result[LogosDeliveryInterface, string] {.gcsafe.} =
ok(LogosDeliveryInterface(LogosDelivery.createUnderContext(NewBrokerContext())))
)

View File

@ -6,6 +6,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility
import brokers/broker_context
import logos_delivery/api/messaging_client_interface
import
logos_delivery/waku/[
waku_core,
@ -78,7 +79,7 @@ proc getMissingMsgsFromStore(
proc processIncomingMessage(
self: RecvService, pubsubTopic: string, message: WakuMessage
): bool =
): Future[bool] {.async.} =
## Return false if the incoming message is from a non-subscribed topic,
## or if the message is a duplicate (recently-seen). Otherwise, save it as
## recently-seen, emit a MessageReceivedEvent, and return true.
@ -100,7 +101,7 @@ proc processIncomingMessage(
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
self.recentReceivedMsgs.add(rxMsg)
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
await MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
return true
proc checkStore*(self: RecvService) {.async.} =
@ -143,7 +144,7 @@ proc checkStore*(self: RecvService) {.async.} =
let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes)
if missingMsgsRet.isOk():
for msgTuple in missingMsgsRet.get():
if self.processIncomingMessage(msgTuple.pubsubTopic, msgTuple.msg):
if await self.processIncomingMessage(msgTuple.pubsubTopic, msgTuple.msg):
info "recv service store-recovered message",
msg_hash = shortLog(msgTuple.hash), pubsubTopic = msgTuple.pubsubTopic
else:

View File

@ -5,6 +5,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[sequtils, tables, options, typetraits]
import chronos, chronicles, libp2p/utility
import brokers/broker_context
import logos_delivery/api/messaging_client_interface
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
logos_delivery/waku/[
@ -170,14 +171,14 @@ proc checkStoredMessages(self: SendService) {.async.} =
await self.checkMsgsInStore(tasksToValidate)
proc reportTaskResult(self: SendService, task: DeliveryTask) =
proc reportTaskResult(self: SendService, task: DeliveryTask) {.async.} =
case task.state
of DeliveryState.SuccessfullyPropagated:
# TODO: in case of unable to strore check messages shall we report success instead?
if not task.propagateEventEmitted:
info "Message successfully propagated",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
MessagePropagatedEvent.emit(
await MessagePropagatedEvent.emit(
self.brokerCtx, task.requestId, task.msgHash.to0xHex()
)
task.propagateEventEmitted = true
@ -185,14 +186,14 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
of DeliveryState.SuccessfullyValidated:
info "Message successfully sent",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex())
await MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex())
return
of DeliveryState.FailedToDeliver:
error "Failed to send message",
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
error = task.errorDesc
MessageErrorEvent.emit(
await MessageErrorEvent.emit(
self.brokerCtx, task.requestId, task.msgHash.to0xHex(), task.errorDesc
)
return
@ -207,15 +208,16 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
error = "Message too old",
age = task.messageAge()
task.state = DeliveryState.FailedToDeliver
MessageErrorEvent.emit(
await MessageErrorEvent.emit(
self.brokerCtx,
task.requestId,
task.msgHash.to0xHex(),
"Unable to send within retry time window",
)
proc evaluateAndCleanUp(self: SendService) =
self.taskCache.forEach(self.reportTaskResult(it))
proc evaluateAndCleanUp(self: SendService) {.async.} =
for task in self.taskCache:
await self.reportTaskResult(task)
self.taskCache.keepItIf(
it.state != DeliveryState.SuccessfullyValidated and
it.state != DeliveryState.FailedToDeliver
@ -241,7 +243,7 @@ proc serviceLoop(self: SendService) {.async.} =
while true:
await self.trySendMessages()
await self.checkStoredMessages()
self.evaluateAndCleanUp()
await self.evaluateAndCleanUp()
## TODO: add circuit breaker to avoid infinite looping in case of persistent failures
## Use OnlineStateChange observers to pause/resume the loop
await sleepAsync(ServiceLoopInterval)
@ -264,6 +266,6 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} =
contentTopic = task.msg.contentTopic, error = error
await self.sendProcessor.process(task)
reportTaskResult(self, task)
await reportTaskResult(self, task)
if task.state != DeliveryState.FailedToDeliver:
self.addTask(task)

View File

@ -1,24 +1,19 @@
import results, chronos
import chronicles
import brokers/broker_implement
import logos_delivery/api/messaging_client_interface
import
logos_delivery/waku/api/types,
logos_delivery/waku/node/[waku_node, subscription_manager],
logos_delivery/messaging/delivery_service/[recv_service, send_service],
logos_delivery/messaging/delivery_service/send_service/delivery_task
type MessagingClient* = ref object
type MessagingClient* = ref object of MessagingClientInterface
node: WakuNode
sendService*: SendService
recvService*: RecvService
started: bool
proc new*(
T: type MessagingClient, useP2PReliability: bool, node: WakuNode
): Result[T, string] =
let sendService = ?SendService.new(useP2PReliability, node)
let recvService = RecvService.new(node)
ok(T(node: node, sendService: sendService, recvService: recvService))
proc start*(self: MessagingClient): Result[void, string] =
if self.started:
return ok()
@ -34,26 +29,45 @@ proc stop*(self: MessagingClient) {.async.} =
await self.recvService.stopRecvService()
self.started = false
proc send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
let isSubbed =
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
BrokerImplement MessagingClient of MessagingClientInterface:
proc new*(T: typedesc[MessagingClient], useP2PReliability: bool, node: WakuNode): T =
let sendService = SendService.new(useP2PReliability, node).valueOr:
error "Failed to initialize SendService", error = error
quit(QuitFailure)
let requestId = RequestId.new(self.node.rng)
let recvService = RecvService.new(node)
T(node: node, sendService: sendService, recvService: recvService, started: false)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
method subscribe(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
return self.node.subscriptionManager.subscribe(contentTopic)
asyncSpawn self.sendService.send(deliveryTask)
method unsubscribe(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
return self.node.subscriptionManager.unsubscribe(contentTopic)
return ok(requestId)
method send(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
let isSubbed =
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(self.node.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
asyncSpawn self.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -5,6 +5,10 @@ import chronicles, chronos, libp2p/peerid, results
import logos_delivery/waku/factory/waku
import logos_delivery/messaging/messaging_client
import logos_delivery/api/messaging_client_interface
# brings the interface `send` method into scope: the impl's `method send` in the
# BrokerImplement block is not exported, so the call below dispatches through the
# MessagingClientInterface method instead.
import logos_delivery/waku/[requests/health_requests, waku_core, waku_node]
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/node/subscription_manager

View File

@ -81,9 +81,10 @@ const TheWakuNetworkPreset* = ProtocolsConfig(
),
)
type WakuMode* {.pure.} = enum
Edge
Core
# WakuMode was elevated to logos_delivery/api/types; re-exported here so
# existing call sites are unaffected.
from logos_delivery/api/types import WakuMode
export WakuMode
type NodeConfig* {.
requiresInit, deprecated: "Use WakuNodeConf from tools/confutils/cli_args instead"

View File

@ -2,77 +2,8 @@ import logos_delivery/waku/compat/option_valueor
import libp2p/crypto/crypto
{.push raises: [].}
import bearssl/rand, std/times, chronos
import stew/byteutils
import logos_delivery/waku/utils/requests as request_utils
import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time]
import logos_delivery/waku/requests/requests
type
MessageEnvelope* = object
contentTopic*: ContentTopic
payload*: seq[byte]
ephemeral*: bool
meta*: seq[byte]
## Opaque wire-format marker carried on the underlying WakuMessage.
## Higher layers (e.g. Reliable Channel) stamp this so peers can route
## ingress traffic to their corresponding layer. Empty by default.
RequestId* = distinct string
ConnectionStatus* {.pure.} = enum
Disconnected
PartiallyConnected
Connected
proc new*(T: typedesc[RequestId], rng: crypto.Rng): T =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
proc `$`*(r: RequestId): string {.inline.} =
string(r)
proc `==`*(a, b: RequestId): bool {.inline.} =
string(a) == string(b)
proc init*(
T: type MessageEnvelope,
contentTopic: ContentTopic,
payload: seq[byte] | string,
ephemeral: bool = false,
meta: seq[byte] = @[],
): MessageEnvelope =
when payload is seq[byte]:
MessageEnvelope(
contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta
)
else:
MessageEnvelope(
contentTopic: contentTopic,
payload: payload.toBytes(),
ephemeral: ephemeral,
meta: meta,
)
proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
## Convert a MessageEnvelope to a WakuMessage.
var wm = WakuMessage(
contentTopic: envelope.contentTopic,
payload: envelope.payload,
ephemeral: envelope.ephemeral,
meta: envelope.meta,
timestamp: getNowInNanosecondTime(),
)
## TODO: First find out if proof is needed at all
## Follow up: left it to the send logic to add RLN proof if needed and possible
# let requestedProof = (
# waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat())
# ).valueOr:
# warn "Failed to add RLN proof to WakuMessage: ", error = error
# return wm
# wm.proof = requestedProof.proof
return wm
# logos_delivery/api/types; re-exported here so existing call sites are unaffected.
import logos_delivery/api/types
export types
{.pop.}

View File

@ -8,7 +8,7 @@ export protocol_health, topic_health
# Notify health changes to node connectivity
EventBroker:
type EventConnectionStatusChange* = object
type ConnectionStatusChangeEvent* = object
connectionStatus*: ConnectionStatus
# Notify health changes to a subscribed topic
@ -16,12 +16,12 @@ EventBroker:
# from/to content topic is provided in the new API (so we know which
# content topics are of interest to the application)
EventBroker:
type EventContentTopicHealthChange* = object
type ContentTopicHealthChangeEvent* = object
contentTopic*: ContentTopic
health*: TopicHealth
# Notify health changes to a shard (pubsub topic)
EventBroker:
type EventShardTopicHealthChange* = object
type ShardTopicHealthChangeEvent* = object
topic*: PubsubTopic
health*: TopicHealth

View File

@ -1,31 +1,10 @@
import brokers/event_broker
import logos_delivery/waku/[api/types, waku_core/message, waku_core/topics]
export types
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
from logos_delivery/api/messaging_client_interface import
MessageSentEvent, MessageErrorEvent, MessagePropagatedEvent, MessageReceivedEvent
export
types, MessageSentEvent, MessageErrorEvent, MessagePropagatedEvent,
MessageReceivedEvent
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol

View File

@ -384,35 +384,6 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} =
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
return
proc mountMessagingClient*(waku: Waku): Result[void, string] =
if not waku.messagingClient.isNil():
return err("messaging client already mounted")
if waku.node.started:
return err("cannot mount messaging client on a started node")
waku.messagingClient = MessagingClient.new(waku.conf.p2pReliability, waku.node).valueOr:
return err("could not create messaging client: " & $error)
return ok()
proc mountReliableChannelManager*(waku: Waku): Result[void, string] =
if not waku.reliableChannelManager.isNil():
return err("reliable channel manager already mounted")
if waku.messagingClient.isNil():
return err("reliable channel manager requires a mounted messaging client")
if waku.node.started:
return err("cannot mount reliable channel manager on a started node")
let messagingClient = waku.messagingClient
let defaultSendHandler: SendHandler = proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return await messagingClient.send(envelope)
waku.reliableChannelManager = ReliableChannelManager.new(
messagingClient, defaultSendHandler, waku.brokerCtx
).valueOr:
return err("could not create reliable channel manager: " & $error)
return ok()
proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if waku.node.started:
warn "start: waku node already started"

View File

@ -8,18 +8,13 @@ import std/[tables, sequtils, strutils]
import metrics, eth/p2p/discoveryv5/enr, libp2p/peerid, stew/byteutils
import logos_delivery/waku/[waku_node, net/bound_ports]
type
NodeInfoId* {.pure.} = enum
Version
Metrics
MyMultiaddresses
MyENR
MyPeerId
MyBoundPorts
MyMixPubKey
# NodeInfoId was elevated to logos_delivery/api/types; re-exported here so
# existing call sites are unaffected.
from logos_delivery/api/types import NodeInfoId
export NodeInfoId
WakuStateInfo* {.requiresInit.} = object
node: WakuNode
type WakuStateInfo* {.requiresInit.} = object
node: WakuNode
proc getAllPossibleInfoItemIds*(self: WakuStateInfo): seq[NodeInfoId] =
## Returns all possible options that can be queried to learn about the node's information.

View File

@ -51,7 +51,7 @@ type NodeHealthMonitor* = ref object
## if it doesn't make sense for the protocol in question, this is set to zero.
relayObserver: PubSubObserver
peerEventListener: WakuPeerEventListener
shardHealthListener: EventShardTopicHealthChangeListener
shardHealthListener: ShardTopicHealthChangeEventListener
eventLoopLagExceeded: bool
## set to true when the chronos event loop lag exceeds the severe threshold,
## causing the node health to be reported as EVENT_LOOP_LAGGING until lag recovers.
@ -513,7 +513,7 @@ proc healthLoop(hm: NodeHealthMonitor) {.async.} =
hm.connectionStatus = newConnectionStatus
EventConnectionStatusChange.emit(hm.node.brokerCtx, newConnectionStatus)
ConnectionStatusChangeEvent.emit(hm.node.brokerCtx, newConnectionStatus)
if not isNil(hm.onConnectionStatusChange):
await hm.onConnectionStatusChange(newConnectionStatus)
@ -690,10 +690,10 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
).valueOr:
return err("Failed to subscribe to peer events: " & error)
hm.shardHealthListener = EventShardTopicHealthChange.listen(
hm.shardHealthListener = ShardTopicHealthChangeEvent.listen(
hm.node.brokerCtx,
proc(
evt: EventShardTopicHealthChange
evt: ShardTopicHealthChangeEvent
): Future[void] {.async: (raises: []), gcsafe.} =
hm.healthUpdateEvent.fire(),
).valueOr:
@ -728,7 +728,7 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
await hm.eventLoopMonitorFut.cancelAndWait()
await WakuPeerEvent.dropListener(hm.node.brokerCtx, hm.peerEventListener)
await EventShardTopicHealthChange.dropListener(
await ShardTopicHealthChangeEvent.dropListener(
hm.node.brokerCtx, hm.shardHealthListener
)

View File

@ -314,7 +314,7 @@ proc updateShardHealth(
let newHealth = toTopicHealth(state.peers.len)
if newHealth != state.currentHealth:
state.currentHealth = newHealth
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
ShardTopicHealthChangeEvent.emit(self.node.brokerCtx, shard, newHealth)
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
## Remove a peer from edgeFilterSubStates for the given shard,

View File

@ -5,25 +5,9 @@
{.push raises: [].}
import ../topics, ../time
# WakuMessage was elevated to logos_delivery/api/types; re-exported here so
# existing call sites are unaffected.
from logos_delivery/api/types import WakuMessage
export WakuMessage
const MaxMetaAttrLength* = 64 # 64 bytes
type WakuMessage* = object # Data payload transmitted.
payload*: seq[byte]
# String identifier that can be used for content-based filtering.
contentTopic*: ContentTopic
# Application specific metadata.
meta*: seq[byte]
# Number to discriminate different types of payload encryption.
# Compatibility with Whisper/WakuV1.
version*: uint32
# Sender generated timestamp.
timestamp*: Timestamp
# The ephemeral attribute indicates signifies the transient nature of the
# message (if the message should be stored).
ephemeral*: bool
# Part of RFC 17: https://rfc.vac.dev/spec/17/
# The proof attribute indicates that the message is not spam. This
# attribute will be used in the rln-relay protocol.
proof*: seq[byte]

View File

@ -12,7 +12,10 @@ export parsing
## Content topic
type ContentTopic* = string
# ContentTopic was elevated to logos_delivery/api/types; re-exported here so
# existing call sites are unaffected.
from logos_delivery/api/types import ContentTopic
export ContentTopic
const DefaultContentTopic* = ContentTopic("/waku/2/default-content/proto")

View File

@ -503,7 +503,7 @@ proc topicsHealthLoop(w: WakuRelay) {.async.} =
w.topicsHealth[topic] = currentHealth
EventShardTopicHealthChange.emit(w.brokerCtx, topic, currentHealth)
ShardTopicHealthChangeEvent.emit(w.brokerCtx, topic, currentHealth)
if not w.onTopicHealthChange.isNil():
futs.add(w.onTopicHealthChange(topic, currentHealth))

395
nim_brokers_instructions.md Normal file
View File

@ -0,0 +1,395 @@
# Working with `nim-brokers`
> Drop-in CLAUDE.md addon for any project that depends on the `brokers` nimble
> package. Type-safe, decoupled messaging on top of **chronos** + **results**.
> All public APIs are exception-free: errors ride `Result[T, string]`, never raises.
## Mental model
Three macros, each declares a **broker type** and generates its full API. The
type *is* the channel — you call class-method-style on the typedesc: `T.emit`,
`T.request`, `T.listen`, `T.setProvider`. No instances, no singletons to wire.
| Macro | Pattern | Producer side | Consumer side |
|-------|---------|---------------|---------------|
| `EventBroker` | pub/sub, many→many, fire-and-forget | `T.emit(...)` | `T.listen(handler)` |
| `RequestBroker` | request/response, **single** provider | `T.setProvider(handler)` | `T.request(...)` |
| `MultiRequestBroker` | request/response, **many** providers, fan-out | `T.setProvider(handler)` (N×) | `T.request(...)` |
`(mt)` suffix → multi-thread variant (cross-thread dispatch). `(sync)` on
RequestBroker → blocking, non-async. `(API)` → FFI shared-library surface.
Import only what you use:
```nim
import brokers/event_broker
import brokers/request_broker
import brokers/multi_request_broker
import brokers/broker_context # only if you need explicit contexts
```
---
## EventBroker — pub/sub
```nim
import chronos, brokers/event_broker
EventBroker:
type UserLoggedIn = object
userId*: int
name*: string
# listen returns Result[ListenerHandle, string]; keep the handle to drop later
let h = UserLoggedIn.listen(
proc(evt: UserLoggedIn): Future[void] {.async: (raises: []).} =
info "login", id = evt.userId
)
UserLoggedIn.emit(UserLoggedIn(userId: 7, name: "zoli")) # by value
UserLoggedIn.emit(userId = 7, name = "zoli") # by fields (inline-object only)
await UserLoggedIn.dropListener(h.get()) # drop one — cancels its in-flight work
await UserLoggedIn.dropAllListeners() # drop all for this context
```
- `emit` is **sync** here (single-thread): snapshots listeners, `asyncSpawn`s
each. It does not await delivery — `await sleepAsync(0)` or yield to flush in tests.
- Handlers MUST be `{.async: (raises: []).}`. Swallow your own exceptions.
- `dropListener`/`dropAllListeners` are `async` and **cancel** in-flight handlers
before returning — safe teardown point before releasing resources.
### Payload variants
```nim
EventBroker:
type Tick = void # payload-less signal: Tick.emit() / listen(proc(): Future[void]...)
EventBroker:
type Score = int # native/alias/external types auto-wrapped in distinct
EventBroker:
type Blob = ref object # ref payloads fine
data*: seq[byte]
```
---
## RequestBroker — single provider request/response
Two declaration styles. **Coupled** (named `type` + `proc`) and **proc-sugar**
(payload decoupled, broker named after the Capitalized verb).
```nim
import chronos, brokers/request_broker
# Coupled: broker name == type name == request() return payload
RequestBroker:
type FetchUser = object
name*: string
proc signature*(id: int): Future[Result[FetchUser, string]] {.async.}
FetchUser.setProvider(
proc(id: int): Future[Result[FetchUser, string]] {.async.} =
ok(FetchUser(name: "u" & $id))
).isOk()
let r = await FetchUser.request(42) # Result[FetchUser, string]
FetchUser.clearProvider()
```
```nim
# Proc-sugar: broker = Capitalized verb, request() returns the RAW payload
RequestBroker:
proc getVersion(): Future[Result[string, string]] {.async.} # -> broker `GetVersion`
GetVersion.setProvider(
proc(): Future[Result[string, string]] {.async.} = ok("1.2.3")).get()
let v = await GetVersion.request() # r.value is plain string, no unwrap
```
Rules & behaviors:
- **One provider per signature.** A second `setProvider` returns `err(...)` (no
silent override). `clearProvider()` first to swap.
- Two signature slots coexist: zero-arg and arg-based (overload by arity).
- Provider exceptions are caught → `err(<msg>)`. Unset provider → `err(...)`.
- `isProvided()` checks registration. `T.request` is `async` here.
### Sync mode — no event loop needed
```nim
RequestBroker(sync):
proc getId(): Result[int, string] # note: no Future, no {.async.}
GetId.setProvider(proc(): Result[int, string] = ok(42)).isOk()
let id = GetId.request() # blocking, returns Result directly
```
### void payload (action with no return value)
```nim
RequestBroker:
proc doReset(force: bool): Future[Result[void, string]] {.async.}
DoReset.setProvider(proc(force: bool): Future[Result[void, string]] {.async.} =
if force: ok() else: err("need force")).isOk()
```
---
## MultiRequestBroker — fan-out to many providers
Async only. `request()` calls **all** providers via `allFinished`, returns
`Result[seq[Payload], string]`. Any provider failing fails the whole request.
```nim
import chronos, brokers/multi_request_broker
MultiRequestBroker:
type Quote = object
price*: int
proc signature*(sym: string): Future[Result[Quote, string]] {.async.}
discard Quote.setProvider(proc(sym: string): Future[Result[Quote, string]] {.async.} =
ok(Quote(price: 100)))
discard Quote.setProvider(proc(sym: string): Future[Result[Quote, string]] {.async.} =
ok(Quote(price: 101)))
let all = await Quote.request("BTC") # all.get() is seq[Quote], len == 2
Quote.removeProvider(handle.get()) # remove one (handle from setProvider)
Quote.clearProviders() # remove all
```
- No providers registered → `ok(@[])` (empty, not error).
- Identical handler refs deduplicated on registration.
- `setProvider` returns `Result[ProviderHandle, string]`; capture it for `removeProvider`.
---
## BrokerContext — scoping / multi-instance
Every API takes an **optional first `BrokerContext` arg**. Omit it → the
thread-global context (`DefaultBrokerContext`). Use contexts to run independent
broker instances (per component, per test, per thread).
```nim
import brokers/broker_context
let ctx = NewBrokerContext() # globally-unique id (atomic)
discard MyEvent.listen(ctx, handler)
MyEvent.emit(ctx, payload)
FetchUser.setProvider(ctx, provider)
let r = await FetchUser.request(ctx, 42)
await MyEvent.dropAllListeners(ctx)
```
Thread setup helpers (callable before the event loop starts):
| Call | Use |
|------|-----|
| `setThreadBrokerContext(ctx)` | adopt a context created elsewhere as this thread's global |
| `initThreadBrokerContext(): BrokerContext` | create + set as thread-global in one call |
| `threadGlobalBrokerContext()` | read current thread global (lock-free) |
Async scoped swap (needs chronos loop): `lockGlobalBrokerContext` /
`lockNewGlobalBrokerContext` templates.
---
## Multi-thread variants `(mt)`
Add `(mt)`. Same surface, but **`emit` becomes async** (cross-thread dispatch
via `Channel[T]`). Build with `--threads:on`.
```nim
EventBroker(mt):
type Job = object
id*: int
# from any thread:
proc worker() {.thread.} =
waitFor Job.emit(Job(id: 1)) # mt emit is async — await / waitFor it
```
- Same-thread calls take a direct fast path; cross-thread go through a per-bucket
channel drained by one dispatch coroutine. fd cost is **O(threads)**, not per-broker.
- A thread that listens must keep its event loop alive (the broker dispatches on it).
- MT brokers accept capacity kwargs: `EventBroker(mt, queueDepth = ..., slabCapacity = ...,
maxPayloadBytes = ..., preset = "...")`. Omit for defaults.
---
## Decision guide
| You want… | Use |
|-----------|-----|
| Notify N listeners, don't care about replies | `EventBroker` |
| Ask one authority for an answer | `RequestBroker` |
| Blocking call, no async context | `RequestBroker(sync)` |
| Ask everyone, aggregate replies | `MultiRequestBroker` |
| Same pattern across OS threads | add `(mt)`, `--threads:on`, await `emit` |
| Multiple isolated instances | pass a `BrokerContext` first arg |
| Expose to C/C++/Python/Rust/Go | `(API)` + `registerBrokerLibrary` (see AGENTS.md) |
## Gotchas
- Handlers/providers are `raises: []` — never let an exception escape; return `err()`.
- `setProvider` on a RequestBroker that already has one **fails** — clear first.
- Single-thread `emit` returns immediately; await a yield before asserting in tests.
- A non-`object`/`ref object` broker type is auto-wrapped in `distinct`; construct
with `T(value)` and read with the base-type conversion.
- Keep all interaction with one context on one thread (single-thread brokers are
thread-local); cross-thread requires the `(mt)` variant.
---
## FFI API `(API)` — expose brokers as a C/C++/Python/Rust/Go shared library
Add `(API)` to `RequestBroker`/`EventBroker`. Same declaration syntax — it
additionally generates a fixed C ABI and typed foreign wrappers. Wire format is
CBOR; wrappers carry the typed surface. Build with `-d:BrokerFfiApi --threads:on
--app:lib`.
```nim
{.push raises: [].}
import brokers/[event_broker, request_broker, broker_context, api_library]
# Plain Nim object types used in signatures are AUTO-registered — no annotation.
type DeviceInfo* = object
deviceId*: int64
name*: string
online*: bool
RequestBroker(API):
type GetDevice = object # broker name == type name == response payload
deviceId*: int64
name*: string
proc signature*(deviceId: int64): Future[Result[GetDevice, string]] {.async.}
EventBroker(API):
type DeviceStatusChanged = object
deviceId*: int64
online*: bool
timestampMs*: int64
```
Providers + event emission live in one proc named **`setupProviders`** (the
generated runtime calls it on the processing thread during `createContext`):
```nim
proc setupProviders(ctx: BrokerContext): Result[void, string] =
let r = GetDevice.setProvider(ctx, # always pass the ctx the runtime gives you
proc(deviceId: int64): Future[Result[GetDevice, string]] {.closure, async.} =
await DeviceStatusChanged.emit(ctx, # API emit is async — await it
DeviceStatusChanged(deviceId: deviceId, online: true, timestampMs: 0))
ok(GetDevice(deviceId: deviceId, name: "u")))
if r.isErr(): return err("register GetDevice: " & r.error())
ok()
# MUST be the last declaration in the module:
registerBrokerLibrary:
name: "mylib" # MUST match --nimMainPrefix and the .so basename
version: "1.0.0" # baked into <lib>_version() static string
initializeRequest: InitializeRequest # post-create config broker (optional)
shutdownRequest: ShutdownRequest # orderly teardown broker (optional)
{.pop.}
```
Build (name / `--nimMainPrefix` / `registerBrokerLibrary name` must all match):
```
nim c -d:BrokerFfiApi --threads:on --app:lib --path:. \
--outdir:build --nimMainPrefix:mylib mylib.nim
```
What you get — a fixed **11-function C ABI** per library: `_version`,
`_initialize` (once per process), `_createContext` (per instance), `_shutdown(ctx)`,
`_allocBuffer`, `_freeBuffer`, `_call`, `_subscribe`, `_unsubscribe`, `_listApis`,
`_getSchema`. `<lib>.h` (C) and `<lib>.hpp` (C++) are always emitted.
| Flag | Emits | Notes |
|------|-------|-------|
| *(default)* | `<lib>.h`, `<lib>.hpp` | C + C++ always |
| `-d:BrokerFfiApiGenPy` | `<lib>.py` (cbor2) | next to the `.so` |
| `-d:BrokerFfiApiGenRust` | `<lib>_rs/` Cargo crate | ciborium + serde |
| `-d:BrokerFfiApiGenGo` | `<lib>_go/` Go module | fxamacker/cbor |
FFI rules:
- `registerBrokerLibrary` is a **no-op without `-d:BrokerFfiApi`** — no `when defined`
guard needed; the normal in-process broker API still works.
- `(API)` brokers ride the MT lane, so they accept the same capacity kwargs as
`(mt)`: `RequestBroker(API, queueDepth = .., slabCapacity = .., maxPayloadBytes = ..,
preset = "..")`.
- `_createContext()` is readiness-synchronous: returns only after providers +
listeners are installed and the event courier is live.
- Inspect generated Nim with `-d:brokerDebug``build/broker_debug/*.gen.nim`.
---
## BrokerInterface / BrokerImplement — hierarchical / OOP layer
An object-oriented facade over the brokers: an **interface** groups several
brokers behind one abstract type; an **implementation** provides per-instance
methods. Each instance gets its own `BrokerContext`, so two instances of the same
impl are fully isolated. Direct `instance.method()` calls **tunnel through broker
dispatch** (so provider mocks are honored — not a plain vtable call).
```nim
import brokers/broker_interface
import brokers/broker_implement
BrokerInterface(IGreeter):
EventBroker:
type Greeted = object
who: string
RequestBroker:
proc greet(name: string): Future[Result[string, string]] {.async.}
RequestBroker:
proc version(): Future[Result[string, string]] {.async.}
type GreeterImpl = ref object of IGreeter # MUST be `ref object of <Interface>`
prefix: string
BrokerImplement GreeterImpl of IGreeter:
proc new(T: typedesc[GreeterImpl], prefix: string): GreeterImpl =
GreeterImpl(prefix: prefix) # optional ctor; create() calls it
method greet(self: GreeterImpl, name: string): Future[Result[string, string]] {.async.} =
ok(self.prefix & name)
method version(self: GreeterImpl): Future[Result[string, string]] {.async.} =
ok("v2")
```
Use it:
```nim
let g = GreeterImpl.create(prefix = "hi ") # new() + wires providers under g.brokerCtx
echo (waitFor g.greet("sue")).value # "hi sue" — tunnels through Greet broker
let base: IGreeter = g # virtual dispatch via the interface type
echo (waitFor base.greet("x")).value # resolves to the override
# Each instance is isolated by its own context:
let a = GreeterImpl.create(prefix = "a:")
let b = GreeterImpl.create(prefix = "b:")
# a.brokerCtx != b.brokerCtx
g.close() # clears THIS instance's providers + listeners; idempotent
```
Event facade (instance-scoped listen/emit — context is injected for you):
```nim
discard g.listen(Greeted,
proc(ev: Greeted): Future[void] {.async: (raises: []), gcsafe.} = …)
g.emit(Greeted, Greeted(who: "bob"))
```
Factory / dependency injection (resolve an impl behind the interface):
```nim
IGreeter.provideFactory(
proc(cfg: string): Result[IGreeter, string] =
ok(GreeterImpl.create(prefix = cfg)))
let d = IGreeter.create("cfg:") # Result[IGreeter, string]; last factory wins
```
Key points:
- The broker for `proc greet` is named **`Greet`** (Capitalized verb). Address it
directly with the instance context: `Greet.request(g.brokerCtx, "bob")`,
`Greet.clearProvider(g.brokerCtx)` (e.g. to install a mock).
- `Impl.create(args…)` = fresh context + `new` + provider wiring.
`Impl.createUnderContext(ctx, args…)` wires under an externally-supplied context
(the path the FFI runtime drives).
- `BrokerInterface(API, IName)` lowers the sub-brokers onto the MT/FFI lane so the
whole interface can be exposed as a shared library; `BrokerImplement` is unchanged.
- Sub-instances returned from a method (factory pattern) share the parent's
`classCtx` (routing) but get a distinct `instanceCtx` — see `classCtx()` /
`instanceCtx()` accessors.

View File

@ -328,8 +328,8 @@
}
},
"brokers": {
"version": "#v3.1.1",
"vcsRevision": "a7316a35f1b62e3497ae8ee0fc1aace74df0beb2",
"version": "#cf5ee65cc20211068d7191de7e5e177c0dc212fa",
"vcsRevision": "cf5ee65cc20211068d7191de7e5e177c0dc212fa",
"url": "https://github.com/NagyZoltanPeter/nim-brokers.git",
"downloadMethod": "git",
"dependencies": [
@ -341,7 +341,7 @@
"cbor_serialization"
],
"checksums": {
"sha1": "4447d7c1f9da14ae439afb23aee45116ce2ecb40"
"sha1": "9bb46550b5bb9bde6722697d678ea236e2143350"
}
},
"stint": {

View File

@ -32,35 +32,35 @@ proc waitForConnectionStatus(
var future = newFuture[void]("waitForConnectionStatus")
let handler: EventConnectionStatusChangeListenerProc = proc(
e: EventConnectionStatusChange
e: ConnectionStatusChangeEvent
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.connectionStatus == expected:
future.complete()
let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr:
let handle = ConnectionStatusChangeEvent.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
if not await future.withTimeout(TestTimeout):
raiseAssert "Timeout waiting for status: " & $expected
finally:
await EventConnectionStatusChange.dropListener(brokerCtx, handle)
await ConnectionStatusChangeEvent.dropListener(brokerCtx, handle)
proc waitForShardHealthy(
brokerCtx: BrokerContext
): Future[EventShardTopicHealthChange] {.async.} =
var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy")
): Future[ShardTopicHealthChangeEvent] {.async.} =
var future = newFuture[ShardTopicHealthChangeEvent]("waitForShardHealthy")
let handler: EventShardTopicHealthChangeListenerProc = proc(
e: EventShardTopicHealthChange
e: ShardTopicHealthChangeEvent
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.health == TopicHealth.MINIMALLY_HEALTHY or
e.health == TopicHealth.SUFFICIENTLY_HEALTHY:
future.complete(e)
let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr:
let handle = ShardTopicHealthChangeEvent.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
@ -69,7 +69,7 @@ proc waitForShardHealthy(
else:
raiseAssert "Timeout waiting for shard health event"
finally:
await EventShardTopicHealthChange.dropListener(brokerCtx, handle)
await ShardTopicHealthChangeEvent.dropListener(brokerCtx, handle)
suite "LM API health checking":
var
@ -94,7 +94,7 @@ suite "LM API health checking":
lockNewGlobalBrokerContext:
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = Core
conf.mode = some(WakuMode.Core)
conf.listenAddress = parseIpAddress("0.0.0.0")
conf.tcpPort = Port(0)
conf.discv5UdpPort = Port(0)
@ -193,7 +193,7 @@ suite "LM API health checking":
check isConnected == true
asyncTest "EventConnectionStatusChange, detect connect and disconnect":
asyncTest "ConnectionStatusChangeEvent, detect connect and disconnect":
let connectFuture =
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected)
@ -205,7 +205,7 @@ suite "LM API health checking":
await client.node.disconnectNode(servicePeerInfo)
await disconnectFuture
asyncTest "EventShardTopicHealthChange, detect health improvement":
asyncTest "ShardTopicHealthChangeEvent, detect health improvement":
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
let healthEventFuture = waitForShardHealthy(client.brokerCtx)
@ -273,7 +273,7 @@ suite "LM API health checking":
lockNewGlobalBrokerContext:
var edgeConf = defaultWakuNodeConf().valueOr:
raiseAssert error
edgeConf.mode = Edge
edgeConf.mode = some(WakuMode.Edge)
edgeConf.listenAddress = parseIpAddress("0.0.0.0")
edgeConf.tcpPort = Port(0)
edgeConf.discv5UdpPort = Port(0)

View File

@ -16,7 +16,7 @@ suite "WakuNodeConf - mode-driven toWakuConf":
## Given
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = Core
conf.mode = some(WakuMode.Core)
conf.clusterId = some(1'u16)
## When
@ -37,7 +37,7 @@ suite "WakuNodeConf - mode-driven toWakuConf":
## Given
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = Edge
conf.mode = some(WakuMode.Edge)
conf.clusterId = some(1'u16)
## When
@ -54,11 +54,11 @@ suite "WakuNodeConf - mode-driven toWakuConf":
wakuConf.storeServiceConf.isSome() == false
wakuConf.peerExchangeService == true
test "noMode uses explicit CLI flags as-is":
test "WakuMode.none uses explicit CLI flags as-is":
## Given
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = cli_args.WakuMode.noMode
conf.mode = none[WakuMode]()
conf.relay = true
conf.lightpush = false
conf.clusterId = some(5'u16)
@ -79,7 +79,7 @@ suite "WakuNodeConf - mode-driven toWakuConf":
## Given - user sets relay=false but mode=Core should override
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = Core
conf.mode = some(WakuMode.Core)
conf.relay = false # will be overridden by Core mode
## When
@ -101,7 +101,7 @@ suite "WakuNodeConf - JSON parsing with fieldPairs":
require confRes.isOk()
let conf = confRes.get()
check:
conf.mode == cli_args.WakuMode.noMode
conf.mode == none[WakuMode]()
conf.clusterId.isNone()
conf.logLevel == logging.LogLevel.INFO
@ -113,7 +113,7 @@ suite "WakuNodeConf - JSON parsing with fieldPairs":
require confRes.isOk()
let conf = confRes.get()
check:
conf.mode == Core
conf.mode == some(WakuMode.Core)
conf.clusterId == some(42'u16)
test "JSON with Edge mode":
@ -124,7 +124,7 @@ suite "WakuNodeConf - JSON parsing with fieldPairs":
require confRes.isOk()
let conf = confRes.get()
check:
conf.mode == Edge
conf.mode == some(WakuMode.Edge)
test "JSON with logLevel":
## Given / When

View File

@ -228,8 +228,7 @@ suite "Health Monitor - events":
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds =
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
let ds = MessagingClient.new(false, nodeA)
ds.start().expect("Failed to start MessagingClient")
let monitorA = NodeHealthMonitor.new(nodeA)
@ -332,8 +331,7 @@ suite "Health Monitor - events":
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds =
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
let ds = MessagingClient.new(false, nodeA)
ds.start().expect("Failed to start MessagingClient")
let subMgr = nodeA.subscriptionManager
@ -392,13 +390,13 @@ suite "Health Monitor - events":
check lastStatus == ConnectionStatus.PartiallyConnected
var shardHealthFut = newFuture[EventShardTopicHealthChange]("waitForShardHealth")
var shardHealthFut = newFuture[ShardTopicHealthChangeEvent]("waitForShardHealth")
let shardHealthLis = EventShardTopicHealthChange
let shardHealthLis = ShardTopicHealthChangeEvent
.listen(
nodeA.brokerCtx,
proc(
evt: EventShardTopicHealthChange
evt: ShardTopicHealthChangeEvent
): Future[void] {.async: (raises: []), gcsafe.} =
if not shardHealthFut.finished and (
evt.health == TopicHealth.MINIMALLY_HEALTHY or
@ -413,7 +411,7 @@ suite "Health Monitor - events":
subMgr.subscribe(contentTopic).expect("Failed to subscribe")
let shardHealthOk = await shardHealthFut.withTimeout(TestConnectivityTimeLimit)
await EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis)
await ShardTopicHealthChangeEvent.dropListener(nodeA.brokerCtx, shardHealthLis)
check shardHealthOk == true
check nodeA.subscriptionManager.edgeFilterSubStates.len > 0

View File

@ -14,7 +14,7 @@ suite "Waku API - Create node":
## Given
var nodeConf = defaultWakuNodeConf().valueOr:
raiseAssert "defaultWakuNodeConf failed: " & error
nodeConf.mode = Core
nodeConf.mode = some(WakuMode.Core)
nodeConf.clusterId = some(3'u16)
nodeConf.rest = false
@ -34,7 +34,7 @@ suite "Waku API - Create node":
## Given
var nodeConf = defaultWakuNodeConf().valueOr:
raiseAssert "defaultWakuNodeConf failed: " & error
nodeConf.mode = Core
nodeConf.mode = some(WakuMode.Core)
nodeConf.clusterId = some(99'u16)
nodeConf.rest = false
nodeConf.numShardsInNetwork = 16
@ -66,7 +66,7 @@ suite "Waku API - Create node":
## Given
var nodeConf = defaultWakuNodeConf().valueOr:
raiseAssert "defaultWakuNodeConf failed: " & error
nodeConf.mode = Core
nodeConf.mode = some(WakuMode.Core)
nodeConf.clusterId = some(42'u16)
nodeConf.rest = false
nodeConf.entryNodes = @[

View File

@ -20,6 +20,8 @@ import
secp256k1,
json
from logos_delivery/api/types import WakuMode
import
logos_delivery/waku/factory/[waku_conf, conf_builder/conf_builder, networks_config],
logos_delivery/waku/common/[logging],
@ -37,7 +39,7 @@ import ./envvar as confEnvvarDefs, ./envvar_net as confEnvvarNet
export
confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet, ProtectedShard,
DefaultMaxWakuMessageSizeStr, DefaultAgentString
DefaultMaxWakuMessageSizeStr, DefaultAgentString, WakuMode
logScope:
topics = "waku cli args"
@ -60,11 +62,6 @@ type StartUpCommand* = enum
noCommand # default, runs waku
generateRlnKeystore # generates a new RLN keystore
type WakuMode* {.pure.} = enum
noMode # default - use explicit CLI flags as-is
Core # full service node
Edge # client-only node
type WakuNodeConf* = object
configFile* {.
desc: "Loads configuration from a TOML file (cmd-line parameters take precedence)",
@ -168,10 +165,10 @@ type WakuNodeConf* = object
## General node config
mode* {.
desc:
"Node operation mode. 'Core' enables relay+service protocols. 'Edge' enables client-only protocols. Default: explicit CLI flags used.",
defaultValue: WakuMode.noMode,
"Node operation mode. 'Core' enables relay+service protocols. 'Edge' enables client-only protocols. Default (unset): explicit CLI flags used.",
defaultValue: none(WakuMode),
name: "mode"
.}: WakuMode
.}: Option[WakuMode]
preset* {.
desc:
@ -958,6 +955,25 @@ proc load*(T: type WakuNodeConf, version = ""): ConfResult[T] =
except CatchableError:
err(getCurrentExceptionMsg())
proc loadWakuNodeConfFromFile*(configFile: string): ConfResult[WakuNodeConf] =
## Load a WakuNodeConf from a specific TOML config file path (no CLI args).
## Deliberately a CONCRETE proc (no `T: type` typedesc param): that keeps it
## non-generic, so confutils' `load` macro expands HERE in cli_args' full scope
## (its own `WakuMode` with `noMode`, `logging`, …) instead of at the caller's
## scope — where a different `WakuMode` (api/types) would break `noMode`.
try:
let conf = WakuNodeConf.load(
version = "",
cmdLine = @[],
secondarySources = proc(
conf: WakuNodeConf, sources: auto
) {.gcsafe, raises: [ConfigurationError].} =
sources.addConfigFile(Toml, InputFile(configFile)),
)
ok(conf)
except CatchableError:
err(getCurrentExceptionMsg())
proc defaultWakuNodeConf*(): ConfResult[WakuNodeConf] =
try:
let conf = WakuNodeConf.load(version = "", cmdLine = @[])
@ -1204,25 +1220,25 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
chronos.seconds(n.kadServiceLookupIntervalSec.int64)
)
# Mode-driven configuration overrides
case n.mode
of WakuMode.Core:
b.withRelay(true)
b.filterServiceConf.withEnabled(true)
b.withLightPush(true)
b.discv5Conf.withEnabled(true)
b.withPeerExchange(true)
b.withRendezvous(true)
b.rateLimitConf.withRateLimitsIfNotAssigned(
@["filter:100/1s", "lightpush:5/1s", "px:5/1s"]
)
of WakuMode.Edge:
b.withPeerExchange(true)
b.withRelay(false)
b.filterServiceConf.withEnabled(false)
b.withLightPush(false)
b.storeServiceConf.withEnabled(false)
of WakuMode.noMode:
discard # use explicit CLI flags as-is
# Mode-driven configuration overrides. `none` (formerly `noMode`) means:
# use explicit CLI flags as-is.
if n.mode.isSome():
case n.mode.get()
of WakuMode.Core:
b.withRelay(true)
b.filterServiceConf.withEnabled(true)
b.withLightPush(true)
b.discv5Conf.withEnabled(true)
b.withPeerExchange(true)
b.withRendezvous(true)
b.rateLimitConf.withRateLimitsIfNotAssigned(
@["filter:100/1s", "lightpush:5/1s", "px:5/1s"]
)
of WakuMode.Edge:
b.withPeerExchange(true)
b.withRelay(false)
b.filterServiceConf.withEnabled(false)
b.withLightPush(false)
b.storeServiceConf.withEnabled(false)
return b.build()