feat: add Messaging API (pkg/messaging)

Implement the high-level, idiomatic Go Messaging API mirroring the Nim
MessagingClient, on top of the internal/ffi/liblogosdelivery bridge.

- MessagingClient: New/Start/Stop/Close, Subscribe/Unsubscribe,
  Send -> RequestID. (Named to match the Nim MessagingClient.)
- Unified Events() <-chan Event with a sealed Event interface
  (MessageReceived/Sent/Propagated/Error, ConnectionStatus). Events are dropped
  (never block) if a consumer falls behind.
- Event decoding handles liblogosdelivery's std/json wire format: received
  payload/meta arrive as JSON byte-int arrays (not base64), with base64 + null
  fallbacks; connectionStatus as an enum-name string. Unit-tested.
- Config aliases the kernel WakuNodeConf.
- examples/messaging: runnable demo.

Part of #106 (Store + kernel accessors follow after logos-delivery#3851).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Igor Sirotin 2026-06-10 13:00:00 +03:00
parent 14c9199bff
commit 43ed60b641
No known key found for this signature in database
GPG Key ID: 0EABBCB40CB9AD4A
7 changed files with 541 additions and 0 deletions

View File

@ -0,0 +1,83 @@
// Command messaging is a minimal demonstration of the Messaging API binding:
// create a client, start it, subscribe to a content topic, send a message, and
// print the events that arrive.
//
// Build/run requires liblogosdelivery at link time, e.g.:
//
// export LOGOS_DELIVERY_DIR=/abs/path/to/logos-delivery
// export CGO_CFLAGS="-I$LOGOS_DELIVERY_DIR/liblogosdelivery"
// export CGO_LDFLAGS="-L$LOGOS_DELIVERY_DIR/build -Wl,-rpath,$LOGOS_DELIVERY_DIR/build"
// go run ./examples/messaging
package main
import (
"context"
"log"
"time"
"github.com/logos-messaging/logos-delivery-go-bindings/pkg/messaging"
)
func main() {
const contentTopic = "/my-app/1/chat/proto"
cfg := messaging.Config{
Relay: true,
ClusterID: 16,
Shards: []uint16{0},
NumShardsInNetwork: 8,
LogLevel: "INFO",
}
client, err := messaging.New(cfg)
if err != nil {
log.Fatalf("new client: %v", err)
}
defer func() {
if err := client.Close(); err != nil {
log.Printf("close: %v", err)
}
}()
if err := client.Start(); err != nil {
log.Fatalf("start: %v", err)
}
defer func() {
if err := client.Stop(); err != nil {
log.Printf("stop: %v", err)
}
}()
if err := client.Subscribe(contentTopic); err != nil {
log.Fatalf("subscribe: %v", err)
}
// Print incoming events until the program exits.
go func() {
for ev := range client.Events() {
switch e := ev.(type) {
case messaging.MessageReceivedEvent:
log.Printf("received on %s: %q", e.Message.ContentTopic, e.Message.Payload)
case messaging.MessageSentEvent:
log.Printf("sent: req=%s hash=%s", e.RequestID, e.MessageHash)
case messaging.MessagePropagatedEvent:
log.Printf("propagated: req=%s", e.RequestID)
case messaging.MessageErrorEvent:
log.Printf("error: req=%s %s", e.RequestID, e.Err)
case messaging.ConnectionStatusEvent:
log.Printf("connection status: %s", e.Status)
}
}
}()
reqID, err := client.Send(context.Background(), messaging.Envelope{
ContentTopic: contentTopic,
Payload: []byte("hello logos"),
})
if err != nil {
log.Fatalf("send: %v", err)
}
log.Printf("queued message, request id: %s", reqID)
time.Sleep(5 * time.Second)
}

8
pkg/messaging/config.go Normal file
View File

@ -0,0 +1,8 @@
package messaging
import "github.com/logos-messaging/logos-delivery-go-bindings/pkg/kernel/common"
// Config is the node configuration passed to the underlying library. Its JSON
// representation is a WakuNodeConf, which is what logosdelivery_create_node
// consumes. It aliases the kernel config type so the two stay in lockstep.
type Config = common.WakuConfig

View File

@ -2,4 +2,8 @@
// logos-delivery Messaging API (an opinionated layer over the kernel protocols
// that owns reliability, re-subscriptions, store-based catch-up and the
// Messaging event surface).
//
// It exposes a MessagingClient (create/start/stop, send/subscribe/unsubscribe) and a
// unified Events channel, backed by cgo calls into liblogosdelivery via the
// internal/ffi package.
package messaging

17
pkg/messaging/envelope.go Normal file
View File

@ -0,0 +1,17 @@
package messaging
// ContentTopic is an application-level message category, e.g.
// "/my-app/1/chat/proto".
type ContentTopic = string
// RequestID correlates a Send call with its later MessageSentEvent /
// MessagePropagatedEvent / MessageErrorEvent.
type RequestID string
// Envelope is an outgoing Messaging API message.
type Envelope struct {
ContentTopic ContentTopic
Payload []byte
// Ephemeral marks the message as transient (not stored).
Ephemeral bool
}

213
pkg/messaging/event.go Normal file
View File

@ -0,0 +1,213 @@
package messaging
import (
"encoding/base64"
"encoding/json"
"fmt"
)
// ConnectionStatus reports the node's overall connectivity.
type ConnectionStatus int
const (
Disconnected ConnectionStatus = iota
PartiallyConnected
Connected
)
func (s ConnectionStatus) String() string {
switch s {
case Disconnected:
return "Disconnected"
case PartiallyConnected:
return "PartiallyConnected"
case Connected:
return "Connected"
default:
return fmt.Sprintf("ConnectionStatus(%d)", int(s))
}
}
// Message is a received message (the underlying WakuMessage).
type Message struct {
ContentTopic ContentTopic
Payload []byte
Meta []byte
Version uint32
// Timestamp is sender-generated, in nanoseconds.
Timestamp int64
Ephemeral bool
}
// Event is the sealed interface implemented by every Messaging API event
// delivered on MessagingClient.Events(). Consumers type-switch over the concrete types.
type Event interface {
isMessagingEvent()
}
// MessageReceivedEvent is emitted when a message is received from the network.
type MessageReceivedEvent struct {
MessageHash string
Message Message
}
// MessageSentEvent is emitted when a message has been accepted and queued for
// delivery by the send service.
type MessageSentEvent struct {
RequestID RequestID
MessageHash string
}
// MessagePropagatedEvent is emitted when a message has been propagated to
// neighbouring nodes.
type MessagePropagatedEvent struct {
RequestID RequestID
MessageHash string
}
// MessageErrorEvent is emitted when sending or propagating a message fails.
type MessageErrorEvent struct {
RequestID RequestID
MessageHash string
Err string
}
// ConnectionStatusEvent is emitted when the node's connectivity changes.
type ConnectionStatusEvent struct {
Status ConnectionStatus
}
func (MessageReceivedEvent) isMessagingEvent() {}
func (MessageSentEvent) isMessagingEvent() {}
func (MessagePropagatedEvent) isMessagingEvent() {}
func (MessageErrorEvent) isMessagingEvent() {}
func (ConnectionStatusEvent) isMessagingEvent() {}
// wireBytes decodes a byte field that liblogosdelivery serialises with
// std/json defaults. On receive (WakuMessage) that is a JSON array of byte
// integers; we also accept a base64 string and null for robustness.
type wireBytes []byte
func (b *wireBytes) UnmarshalJSON(data []byte) error {
if len(data) == 0 || string(data) == "null" {
*b = nil
return nil
}
if data[0] == '[' {
var nums []int
if err := json.Unmarshal(data, &nums); err != nil {
return err
}
out := make([]byte, len(nums))
for i, n := range nums {
out[i] = byte(n)
}
*b = out
return nil
}
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
dec, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return err
}
*b = dec
return nil
}
// decodeEvent parses a flat event JSON string from liblogosdelivery into a
// typed Event. Unknown event types return a nil Event and no error so callers
// can ignore them.
func decodeEvent(eventJSON string) (Event, error) {
var head struct {
EventType string `json:"eventType"`
}
if err := json.Unmarshal([]byte(eventJSON), &head); err != nil {
return nil, fmt.Errorf("decode event: %w", err)
}
switch head.EventType {
case "message_received":
var e struct {
MessageHash string `json:"messageHash"`
Message struct {
ContentTopic string `json:"contentTopic"`
Payload wireBytes `json:"payload"`
Meta wireBytes `json:"meta"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp"`
Ephemeral bool `json:"ephemeral"`
} `json:"message"`
}
if err := json.Unmarshal([]byte(eventJSON), &e); err != nil {
return nil, fmt.Errorf("decode message_received: %w", err)
}
return MessageReceivedEvent{
MessageHash: e.MessageHash,
Message: Message{
ContentTopic: e.Message.ContentTopic,
Payload: e.Message.Payload,
Meta: e.Message.Meta,
Version: e.Message.Version,
Timestamp: e.Message.Timestamp,
Ephemeral: e.Message.Ephemeral,
},
}, nil
case "message_sent":
var e struct {
RequestID string `json:"requestId"`
MessageHash string `json:"messageHash"`
}
if err := json.Unmarshal([]byte(eventJSON), &e); err != nil {
return nil, fmt.Errorf("decode message_sent: %w", err)
}
return MessageSentEvent{RequestID: RequestID(e.RequestID), MessageHash: e.MessageHash}, nil
case "message_propagated":
var e struct {
RequestID string `json:"requestId"`
MessageHash string `json:"messageHash"`
}
if err := json.Unmarshal([]byte(eventJSON), &e); err != nil {
return nil, fmt.Errorf("decode message_propagated: %w", err)
}
return MessagePropagatedEvent{RequestID: RequestID(e.RequestID), MessageHash: e.MessageHash}, nil
case "message_error":
var e struct {
RequestID string `json:"requestId"`
MessageHash string `json:"messageHash"`
Error string `json:"error"`
}
if err := json.Unmarshal([]byte(eventJSON), &e); err != nil {
return nil, fmt.Errorf("decode message_error: %w", err)
}
return MessageErrorEvent{RequestID: RequestID(e.RequestID), MessageHash: e.MessageHash, Err: e.Error}, nil
case "connection_status_change":
var e struct {
ConnectionStatus string `json:"connectionStatus"`
}
if err := json.Unmarshal([]byte(eventJSON), &e); err != nil {
return nil, fmt.Errorf("decode connection_status_change: %w", err)
}
return ConnectionStatusEvent{Status: parseConnectionStatus(e.ConnectionStatus)}, nil
default:
return nil, nil
}
}
func parseConnectionStatus(s string) ConnectionStatus {
switch s {
case "Connected":
return Connected
case "PartiallyConnected":
return PartiallyConnected
default:
return Disconnected
}
}

107
pkg/messaging/event_test.go Normal file
View File

@ -0,0 +1,107 @@
package messaging
import (
"bytes"
"testing"
)
func TestDecodeEvent(t *testing.T) {
t.Run("message_received with int-array payload", func(t *testing.T) {
// liblogosdelivery serialises WakuMessage via std/json: payload/meta are
// arrays of byte integers, not base64. "hi" = [104,105].
const raw = `{"eventType":"message_received","messageHash":"0xabc",` +
`"message":{"contentTopic":"/app/1/c/proto","payload":[104,105],` +
`"meta":[1,2],"version":1,"timestamp":1717000000000000000,"ephemeral":true}}`
ev, err := decodeEvent(raw)
if err != nil {
t.Fatalf("decodeEvent: %v", err)
}
got, ok := ev.(MessageReceivedEvent)
if !ok {
t.Fatalf("got %T, want MessageReceivedEvent", ev)
}
if got.MessageHash != "0xabc" {
t.Errorf("messageHash = %q", got.MessageHash)
}
if !bytes.Equal(got.Message.Payload, []byte("hi")) {
t.Errorf("payload = %v, want %v", got.Message.Payload, []byte("hi"))
}
if !bytes.Equal(got.Message.Meta, []byte{1, 2}) {
t.Errorf("meta = %v", got.Message.Meta)
}
if got.Message.ContentTopic != "/app/1/c/proto" {
t.Errorf("contentTopic = %q", got.Message.ContentTopic)
}
if got.Message.Version != 1 || got.Message.Timestamp != 1717000000000000000 || !got.Message.Ephemeral {
t.Errorf("scalar fields wrong: %+v", got.Message)
}
})
t.Run("message_received with base64 payload (robustness)", func(t *testing.T) {
const raw = `{"eventType":"message_received","messageHash":"0x1",` +
`"message":{"contentTopic":"/a/1/b/proto","payload":"aGk=","ephemeral":false}}`
ev, err := decodeEvent(raw)
if err != nil {
t.Fatalf("decodeEvent: %v", err)
}
if got := ev.(MessageReceivedEvent); !bytes.Equal(got.Message.Payload, []byte("hi")) {
t.Errorf("payload = %v", got.Message.Payload)
}
})
t.Run("message_sent", func(t *testing.T) {
ev, err := decodeEvent(`{"eventType":"message_sent","requestId":"req-1","messageHash":"0x9"}`)
if err != nil {
t.Fatal(err)
}
got, ok := ev.(MessageSentEvent)
if !ok || got.RequestID != "req-1" || got.MessageHash != "0x9" {
t.Fatalf("got %#v", ev)
}
})
t.Run("message_propagated", func(t *testing.T) {
ev, err := decodeEvent(`{"eventType":"message_propagated","requestId":"req-2","messageHash":"0x8"}`)
if err != nil {
t.Fatal(err)
}
if got, ok := ev.(MessagePropagatedEvent); !ok || got.RequestID != "req-2" {
t.Fatalf("got %#v", ev)
}
})
t.Run("message_error", func(t *testing.T) {
ev, err := decodeEvent(`{"eventType":"message_error","requestId":"req-3","messageHash":"0x7","error":"boom"}`)
if err != nil {
t.Fatal(err)
}
got, ok := ev.(MessageErrorEvent)
if !ok || got.RequestID != "req-3" || got.Err != "boom" {
t.Fatalf("got %#v", ev)
}
})
t.Run("connection_status_change", func(t *testing.T) {
for in, want := range map[string]ConnectionStatus{
"Connected": Connected,
"PartiallyConnected": PartiallyConnected,
"Disconnected": Disconnected,
} {
raw := `{"eventType":"connection_status_change","connectionStatus":"` + in + `"}`
ev, err := decodeEvent(raw)
if err != nil {
t.Fatalf("%s: %v", in, err)
}
if got := ev.(ConnectionStatusEvent); got.Status != want {
t.Errorf("%s -> %v, want %v", in, got.Status, want)
}
}
})
t.Run("unknown event type is ignored", func(t *testing.T) {
ev, err := decodeEvent(`{"eventType":"something_new","foo":1}`)
if err != nil || ev != nil {
t.Fatalf("got ev=%v err=%v, want nil,nil", ev, err)
}
})
}

View File

@ -0,0 +1,109 @@
package messaging
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"sync"
"github.com/logos-messaging/logos-delivery-go-bindings/internal/ffi/liblogosdelivery"
)
// eventBufferSize bounds the buffered Events channel. Events are dropped (never
// blocked) if a consumer falls behind, so the FFI worker thread is never stalled.
const eventBufferSize = 1024
// MessagingClient is a logos-delivery Messaging API client (the Nim
// MessagingClient). Create it with New, then Start it; consume incoming events
// from Events(); release it with Close.
type MessagingClient struct {
h liblogosdelivery.Handle
events chan Event
closeOnce sync.Once
}
// New creates (but does not start) a MessagingClient from cfg.
func New(cfg Config) (*MessagingClient, error) {
cfgJSON, err := json.Marshal(cfg)
if err != nil {
return nil, fmt.Errorf("marshal config: %w", err)
}
h, err := liblogosdelivery.New(string(cfgJSON))
if err != nil {
return nil, err
}
c := &MessagingClient{h: h, events: make(chan Event, eventBufferSize)}
liblogosdelivery.SetEventHandler(h, c.onEvent)
return c, nil
}
// onEvent runs on the FFI worker thread: decode and hand off without blocking.
func (c *MessagingClient) onEvent(ret int, msg string) {
if ret != liblogosdelivery.RetOK {
return
}
ev, err := decodeEvent(msg)
if err != nil || ev == nil {
return
}
select {
case c.events <- ev:
default:
// Consumer not keeping up; drop rather than block the worker thread.
}
}
// Start starts the client's protocols and Messaging API services.
func (c *MessagingClient) Start() error { return liblogosdelivery.Start(c.h) }
// Stop stops the client. It can be started again.
func (c *MessagingClient) Stop() error { return liblogosdelivery.Stop(c.h) }
// Close stops tracking events and releases the underlying node context. The
// Events channel is closed. The MessagingClient must not be used afterwards.
func (c *MessagingClient) Close() error {
err := liblogosdelivery.Destroy(c.h)
c.closeOnce.Do(func() { close(c.events) })
return err
}
// Subscribe subscribes to a content topic so messages on it are received.
func (c *MessagingClient) Subscribe(topic ContentTopic) error {
return liblogosdelivery.Subscribe(c.h, topic)
}
// Unsubscribe stops receiving messages on a content topic.
func (c *MessagingClient) Unsubscribe(topic ContentTopic) error {
return liblogosdelivery.Unsubscribe(c.h, topic)
}
// Send publishes env and returns the RequestID to correlate with the later
// MessageSentEvent / MessagePropagatedEvent / MessageErrorEvent. The send is
// fire-and-queue: ctx cancellation is honoured before dispatch only.
func (c *MessagingClient) Send(ctx context.Context, env Envelope) (RequestID, error) {
if err := ctx.Err(); err != nil {
return "", err
}
msg, err := json.Marshal(struct {
ContentTopic string `json:"contentTopic"`
Payload string `json:"payload"`
Ephemeral bool `json:"ephemeral"`
}{
ContentTopic: string(env.ContentTopic),
Payload: base64.StdEncoding.EncodeToString(env.Payload),
Ephemeral: env.Ephemeral,
})
if err != nil {
return "", fmt.Errorf("marshal envelope: %w", err)
}
id, err := liblogosdelivery.Send(c.h, string(msg))
if err != nil {
return "", err
}
return RequestID(id), nil
}
// Events returns the channel of incoming Messaging API events. Type-switch over
// the concrete Event types. The channel is closed by Close.
func (c *MessagingClient) Events() <-chan Event { return c.events }