From 43ed60b6414dabfc6fa2421574a863fbb7fac6a9 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Wed, 10 Jun 2026 13:00:00 +0300 Subject: [PATCH] feat: add Messaging API (pkg/messaging) Implement the high-level, idiomatic Go Messaging API mirroring the Nim MessagingClient, on top of the internal/ffi/liblogosdelivery bridge. - MessagingClient: New/Start/Stop/Close, Subscribe/Unsubscribe, Send -> RequestID. (Named to match the Nim MessagingClient.) - Unified Events() <-chan Event with a sealed Event interface (MessageReceived/Sent/Propagated/Error, ConnectionStatus). Events are dropped (never block) if a consumer falls behind. - Event decoding handles liblogosdelivery's std/json wire format: received payload/meta arrive as JSON byte-int arrays (not base64), with base64 + null fallbacks; connectionStatus as an enum-name string. Unit-tested. - Config aliases the kernel WakuNodeConf. - examples/messaging: runnable demo. Part of #106 (Store + kernel accessors follow after logos-delivery#3851). Co-Authored-By: Claude Opus 4.8 (1M context) --- examples/messaging/main.go | 83 ++++++++++++ pkg/messaging/config.go | 8 ++ pkg/messaging/doc.go | 4 + pkg/messaging/envelope.go | 17 +++ pkg/messaging/event.go | 213 ++++++++++++++++++++++++++++++ pkg/messaging/event_test.go | 107 +++++++++++++++ pkg/messaging/messaging_client.go | 109 +++++++++++++++ 7 files changed, 541 insertions(+) create mode 100644 examples/messaging/main.go create mode 100644 pkg/messaging/config.go create mode 100644 pkg/messaging/envelope.go create mode 100644 pkg/messaging/event.go create mode 100644 pkg/messaging/event_test.go create mode 100644 pkg/messaging/messaging_client.go diff --git a/examples/messaging/main.go b/examples/messaging/main.go new file mode 100644 index 0000000..c24a602 --- /dev/null +++ b/examples/messaging/main.go @@ -0,0 +1,83 @@ +// Command messaging is a minimal demonstration of the Messaging API binding: +// create a client, start it, subscribe to a content topic, send a message, and +// print the events that arrive. +// +// Build/run requires liblogosdelivery at link time, e.g.: +// +// export LOGOS_DELIVERY_DIR=/abs/path/to/logos-delivery +// export CGO_CFLAGS="-I$LOGOS_DELIVERY_DIR/liblogosdelivery" +// export CGO_LDFLAGS="-L$LOGOS_DELIVERY_DIR/build -Wl,-rpath,$LOGOS_DELIVERY_DIR/build" +// go run ./examples/messaging +package main + +import ( + "context" + "log" + "time" + + "github.com/logos-messaging/logos-delivery-go-bindings/pkg/messaging" +) + +func main() { + const contentTopic = "/my-app/1/chat/proto" + + cfg := messaging.Config{ + Relay: true, + ClusterID: 16, + Shards: []uint16{0}, + NumShardsInNetwork: 8, + LogLevel: "INFO", + } + + client, err := messaging.New(cfg) + if err != nil { + log.Fatalf("new client: %v", err) + } + defer func() { + if err := client.Close(); err != nil { + log.Printf("close: %v", err) + } + }() + + if err := client.Start(); err != nil { + log.Fatalf("start: %v", err) + } + defer func() { + if err := client.Stop(); err != nil { + log.Printf("stop: %v", err) + } + }() + + if err := client.Subscribe(contentTopic); err != nil { + log.Fatalf("subscribe: %v", err) + } + + // Print incoming events until the program exits. + go func() { + for ev := range client.Events() { + switch e := ev.(type) { + case messaging.MessageReceivedEvent: + log.Printf("received on %s: %q", e.Message.ContentTopic, e.Message.Payload) + case messaging.MessageSentEvent: + log.Printf("sent: req=%s hash=%s", e.RequestID, e.MessageHash) + case messaging.MessagePropagatedEvent: + log.Printf("propagated: req=%s", e.RequestID) + case messaging.MessageErrorEvent: + log.Printf("error: req=%s %s", e.RequestID, e.Err) + case messaging.ConnectionStatusEvent: + log.Printf("connection status: %s", e.Status) + } + } + }() + + reqID, err := client.Send(context.Background(), messaging.Envelope{ + ContentTopic: contentTopic, + Payload: []byte("hello logos"), + }) + if err != nil { + log.Fatalf("send: %v", err) + } + log.Printf("queued message, request id: %s", reqID) + + time.Sleep(5 * time.Second) +} diff --git a/pkg/messaging/config.go b/pkg/messaging/config.go new file mode 100644 index 0000000..7396ad6 --- /dev/null +++ b/pkg/messaging/config.go @@ -0,0 +1,8 @@ +package messaging + +import "github.com/logos-messaging/logos-delivery-go-bindings/pkg/kernel/common" + +// Config is the node configuration passed to the underlying library. Its JSON +// representation is a WakuNodeConf, which is what logosdelivery_create_node +// consumes. It aliases the kernel config type so the two stay in lockstep. +type Config = common.WakuConfig diff --git a/pkg/messaging/doc.go b/pkg/messaging/doc.go index 0c8a8e8..fd4e290 100644 --- a/pkg/messaging/doc.go +++ b/pkg/messaging/doc.go @@ -2,4 +2,8 @@ // logos-delivery Messaging API (an opinionated layer over the kernel protocols // that owns reliability, re-subscriptions, store-based catch-up and the // Messaging event surface). +// +// It exposes a MessagingClient (create/start/stop, send/subscribe/unsubscribe) and a +// unified Events channel, backed by cgo calls into liblogosdelivery via the +// internal/ffi package. package messaging diff --git a/pkg/messaging/envelope.go b/pkg/messaging/envelope.go new file mode 100644 index 0000000..0e794cd --- /dev/null +++ b/pkg/messaging/envelope.go @@ -0,0 +1,17 @@ +package messaging + +// ContentTopic is an application-level message category, e.g. +// "/my-app/1/chat/proto". +type ContentTopic = string + +// RequestID correlates a Send call with its later MessageSentEvent / +// MessagePropagatedEvent / MessageErrorEvent. +type RequestID string + +// Envelope is an outgoing Messaging API message. +type Envelope struct { + ContentTopic ContentTopic + Payload []byte + // Ephemeral marks the message as transient (not stored). + Ephemeral bool +} diff --git a/pkg/messaging/event.go b/pkg/messaging/event.go new file mode 100644 index 0000000..c50b0c5 --- /dev/null +++ b/pkg/messaging/event.go @@ -0,0 +1,213 @@ +package messaging + +import ( + "encoding/base64" + "encoding/json" + "fmt" +) + +// ConnectionStatus reports the node's overall connectivity. +type ConnectionStatus int + +const ( + Disconnected ConnectionStatus = iota + PartiallyConnected + Connected +) + +func (s ConnectionStatus) String() string { + switch s { + case Disconnected: + return "Disconnected" + case PartiallyConnected: + return "PartiallyConnected" + case Connected: + return "Connected" + default: + return fmt.Sprintf("ConnectionStatus(%d)", int(s)) + } +} + +// Message is a received message (the underlying WakuMessage). +type Message struct { + ContentTopic ContentTopic + Payload []byte + Meta []byte + Version uint32 + // Timestamp is sender-generated, in nanoseconds. + Timestamp int64 + Ephemeral bool +} + +// Event is the sealed interface implemented by every Messaging API event +// delivered on MessagingClient.Events(). Consumers type-switch over the concrete types. +type Event interface { + isMessagingEvent() +} + +// MessageReceivedEvent is emitted when a message is received from the network. +type MessageReceivedEvent struct { + MessageHash string + Message Message +} + +// MessageSentEvent is emitted when a message has been accepted and queued for +// delivery by the send service. +type MessageSentEvent struct { + RequestID RequestID + MessageHash string +} + +// MessagePropagatedEvent is emitted when a message has been propagated to +// neighbouring nodes. +type MessagePropagatedEvent struct { + RequestID RequestID + MessageHash string +} + +// MessageErrorEvent is emitted when sending or propagating a message fails. +type MessageErrorEvent struct { + RequestID RequestID + MessageHash string + Err string +} + +// ConnectionStatusEvent is emitted when the node's connectivity changes. +type ConnectionStatusEvent struct { + Status ConnectionStatus +} + +func (MessageReceivedEvent) isMessagingEvent() {} +func (MessageSentEvent) isMessagingEvent() {} +func (MessagePropagatedEvent) isMessagingEvent() {} +func (MessageErrorEvent) isMessagingEvent() {} +func (ConnectionStatusEvent) isMessagingEvent() {} + +// wireBytes decodes a byte field that liblogosdelivery serialises with +// std/json defaults. On receive (WakuMessage) that is a JSON array of byte +// integers; we also accept a base64 string and null for robustness. +type wireBytes []byte + +func (b *wireBytes) UnmarshalJSON(data []byte) error { + if len(data) == 0 || string(data) == "null" { + *b = nil + return nil + } + if data[0] == '[' { + var nums []int + if err := json.Unmarshal(data, &nums); err != nil { + return err + } + out := make([]byte, len(nums)) + for i, n := range nums { + out[i] = byte(n) + } + *b = out + return nil + } + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + dec, err := base64.StdEncoding.DecodeString(s) + if err != nil { + return err + } + *b = dec + return nil +} + +// decodeEvent parses a flat event JSON string from liblogosdelivery into a +// typed Event. Unknown event types return a nil Event and no error so callers +// can ignore them. +func decodeEvent(eventJSON string) (Event, error) { + var head struct { + EventType string `json:"eventType"` + } + if err := json.Unmarshal([]byte(eventJSON), &head); err != nil { + return nil, fmt.Errorf("decode event: %w", err) + } + + switch head.EventType { + case "message_received": + var e struct { + MessageHash string `json:"messageHash"` + Message struct { + ContentTopic string `json:"contentTopic"` + Payload wireBytes `json:"payload"` + Meta wireBytes `json:"meta"` + Version uint32 `json:"version"` + Timestamp int64 `json:"timestamp"` + Ephemeral bool `json:"ephemeral"` + } `json:"message"` + } + if err := json.Unmarshal([]byte(eventJSON), &e); err != nil { + return nil, fmt.Errorf("decode message_received: %w", err) + } + return MessageReceivedEvent{ + MessageHash: e.MessageHash, + Message: Message{ + ContentTopic: e.Message.ContentTopic, + Payload: e.Message.Payload, + Meta: e.Message.Meta, + Version: e.Message.Version, + Timestamp: e.Message.Timestamp, + Ephemeral: e.Message.Ephemeral, + }, + }, nil + + case "message_sent": + var e struct { + RequestID string `json:"requestId"` + MessageHash string `json:"messageHash"` + } + if err := json.Unmarshal([]byte(eventJSON), &e); err != nil { + return nil, fmt.Errorf("decode message_sent: %w", err) + } + return MessageSentEvent{RequestID: RequestID(e.RequestID), MessageHash: e.MessageHash}, nil + + case "message_propagated": + var e struct { + RequestID string `json:"requestId"` + MessageHash string `json:"messageHash"` + } + if err := json.Unmarshal([]byte(eventJSON), &e); err != nil { + return nil, fmt.Errorf("decode message_propagated: %w", err) + } + return MessagePropagatedEvent{RequestID: RequestID(e.RequestID), MessageHash: e.MessageHash}, nil + + case "message_error": + var e struct { + RequestID string `json:"requestId"` + MessageHash string `json:"messageHash"` + Error string `json:"error"` + } + if err := json.Unmarshal([]byte(eventJSON), &e); err != nil { + return nil, fmt.Errorf("decode message_error: %w", err) + } + return MessageErrorEvent{RequestID: RequestID(e.RequestID), MessageHash: e.MessageHash, Err: e.Error}, nil + + case "connection_status_change": + var e struct { + ConnectionStatus string `json:"connectionStatus"` + } + if err := json.Unmarshal([]byte(eventJSON), &e); err != nil { + return nil, fmt.Errorf("decode connection_status_change: %w", err) + } + return ConnectionStatusEvent{Status: parseConnectionStatus(e.ConnectionStatus)}, nil + + default: + return nil, nil + } +} + +func parseConnectionStatus(s string) ConnectionStatus { + switch s { + case "Connected": + return Connected + case "PartiallyConnected": + return PartiallyConnected + default: + return Disconnected + } +} diff --git a/pkg/messaging/event_test.go b/pkg/messaging/event_test.go new file mode 100644 index 0000000..b0c17da --- /dev/null +++ b/pkg/messaging/event_test.go @@ -0,0 +1,107 @@ +package messaging + +import ( + "bytes" + "testing" +) + +func TestDecodeEvent(t *testing.T) { + t.Run("message_received with int-array payload", func(t *testing.T) { + // liblogosdelivery serialises WakuMessage via std/json: payload/meta are + // arrays of byte integers, not base64. "hi" = [104,105]. + const raw = `{"eventType":"message_received","messageHash":"0xabc",` + + `"message":{"contentTopic":"/app/1/c/proto","payload":[104,105],` + + `"meta":[1,2],"version":1,"timestamp":1717000000000000000,"ephemeral":true}}` + ev, err := decodeEvent(raw) + if err != nil { + t.Fatalf("decodeEvent: %v", err) + } + got, ok := ev.(MessageReceivedEvent) + if !ok { + t.Fatalf("got %T, want MessageReceivedEvent", ev) + } + if got.MessageHash != "0xabc" { + t.Errorf("messageHash = %q", got.MessageHash) + } + if !bytes.Equal(got.Message.Payload, []byte("hi")) { + t.Errorf("payload = %v, want %v", got.Message.Payload, []byte("hi")) + } + if !bytes.Equal(got.Message.Meta, []byte{1, 2}) { + t.Errorf("meta = %v", got.Message.Meta) + } + if got.Message.ContentTopic != "/app/1/c/proto" { + t.Errorf("contentTopic = %q", got.Message.ContentTopic) + } + if got.Message.Version != 1 || got.Message.Timestamp != 1717000000000000000 || !got.Message.Ephemeral { + t.Errorf("scalar fields wrong: %+v", got.Message) + } + }) + + t.Run("message_received with base64 payload (robustness)", func(t *testing.T) { + const raw = `{"eventType":"message_received","messageHash":"0x1",` + + `"message":{"contentTopic":"/a/1/b/proto","payload":"aGk=","ephemeral":false}}` + ev, err := decodeEvent(raw) + if err != nil { + t.Fatalf("decodeEvent: %v", err) + } + if got := ev.(MessageReceivedEvent); !bytes.Equal(got.Message.Payload, []byte("hi")) { + t.Errorf("payload = %v", got.Message.Payload) + } + }) + + t.Run("message_sent", func(t *testing.T) { + ev, err := decodeEvent(`{"eventType":"message_sent","requestId":"req-1","messageHash":"0x9"}`) + if err != nil { + t.Fatal(err) + } + got, ok := ev.(MessageSentEvent) + if !ok || got.RequestID != "req-1" || got.MessageHash != "0x9" { + t.Fatalf("got %#v", ev) + } + }) + + t.Run("message_propagated", func(t *testing.T) { + ev, err := decodeEvent(`{"eventType":"message_propagated","requestId":"req-2","messageHash":"0x8"}`) + if err != nil { + t.Fatal(err) + } + if got, ok := ev.(MessagePropagatedEvent); !ok || got.RequestID != "req-2" { + t.Fatalf("got %#v", ev) + } + }) + + t.Run("message_error", func(t *testing.T) { + ev, err := decodeEvent(`{"eventType":"message_error","requestId":"req-3","messageHash":"0x7","error":"boom"}`) + if err != nil { + t.Fatal(err) + } + got, ok := ev.(MessageErrorEvent) + if !ok || got.RequestID != "req-3" || got.Err != "boom" { + t.Fatalf("got %#v", ev) + } + }) + + t.Run("connection_status_change", func(t *testing.T) { + for in, want := range map[string]ConnectionStatus{ + "Connected": Connected, + "PartiallyConnected": PartiallyConnected, + "Disconnected": Disconnected, + } { + raw := `{"eventType":"connection_status_change","connectionStatus":"` + in + `"}` + ev, err := decodeEvent(raw) + if err != nil { + t.Fatalf("%s: %v", in, err) + } + if got := ev.(ConnectionStatusEvent); got.Status != want { + t.Errorf("%s -> %v, want %v", in, got.Status, want) + } + } + }) + + t.Run("unknown event type is ignored", func(t *testing.T) { + ev, err := decodeEvent(`{"eventType":"something_new","foo":1}`) + if err != nil || ev != nil { + t.Fatalf("got ev=%v err=%v, want nil,nil", ev, err) + } + }) +} diff --git a/pkg/messaging/messaging_client.go b/pkg/messaging/messaging_client.go new file mode 100644 index 0000000..d195828 --- /dev/null +++ b/pkg/messaging/messaging_client.go @@ -0,0 +1,109 @@ +package messaging + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "sync" + + "github.com/logos-messaging/logos-delivery-go-bindings/internal/ffi/liblogosdelivery" +) + +// eventBufferSize bounds the buffered Events channel. Events are dropped (never +// blocked) if a consumer falls behind, so the FFI worker thread is never stalled. +const eventBufferSize = 1024 + +// MessagingClient is a logos-delivery Messaging API client (the Nim +// MessagingClient). Create it with New, then Start it; consume incoming events +// from Events(); release it with Close. +type MessagingClient struct { + h liblogosdelivery.Handle + events chan Event + closeOnce sync.Once +} + +// New creates (but does not start) a MessagingClient from cfg. +func New(cfg Config) (*MessagingClient, error) { + cfgJSON, err := json.Marshal(cfg) + if err != nil { + return nil, fmt.Errorf("marshal config: %w", err) + } + h, err := liblogosdelivery.New(string(cfgJSON)) + if err != nil { + return nil, err + } + c := &MessagingClient{h: h, events: make(chan Event, eventBufferSize)} + liblogosdelivery.SetEventHandler(h, c.onEvent) + return c, nil +} + +// onEvent runs on the FFI worker thread: decode and hand off without blocking. +func (c *MessagingClient) onEvent(ret int, msg string) { + if ret != liblogosdelivery.RetOK { + return + } + ev, err := decodeEvent(msg) + if err != nil || ev == nil { + return + } + select { + case c.events <- ev: + default: + // Consumer not keeping up; drop rather than block the worker thread. + } +} + +// Start starts the client's protocols and Messaging API services. +func (c *MessagingClient) Start() error { return liblogosdelivery.Start(c.h) } + +// Stop stops the client. It can be started again. +func (c *MessagingClient) Stop() error { return liblogosdelivery.Stop(c.h) } + +// Close stops tracking events and releases the underlying node context. The +// Events channel is closed. The MessagingClient must not be used afterwards. +func (c *MessagingClient) Close() error { + err := liblogosdelivery.Destroy(c.h) + c.closeOnce.Do(func() { close(c.events) }) + return err +} + +// Subscribe subscribes to a content topic so messages on it are received. +func (c *MessagingClient) Subscribe(topic ContentTopic) error { + return liblogosdelivery.Subscribe(c.h, topic) +} + +// Unsubscribe stops receiving messages on a content topic. +func (c *MessagingClient) Unsubscribe(topic ContentTopic) error { + return liblogosdelivery.Unsubscribe(c.h, topic) +} + +// Send publishes env and returns the RequestID to correlate with the later +// MessageSentEvent / MessagePropagatedEvent / MessageErrorEvent. The send is +// fire-and-queue: ctx cancellation is honoured before dispatch only. +func (c *MessagingClient) Send(ctx context.Context, env Envelope) (RequestID, error) { + if err := ctx.Err(); err != nil { + return "", err + } + msg, err := json.Marshal(struct { + ContentTopic string `json:"contentTopic"` + Payload string `json:"payload"` + Ephemeral bool `json:"ephemeral"` + }{ + ContentTopic: string(env.ContentTopic), + Payload: base64.StdEncoding.EncodeToString(env.Payload), + Ephemeral: env.Ephemeral, + }) + if err != nil { + return "", fmt.Errorf("marshal envelope: %w", err) + } + id, err := liblogosdelivery.Send(c.h, string(msg)) + if err != nil { + return "", err + } + return RequestID(id), nil +} + +// Events returns the channel of incoming Messaging API events. Type-switch over +// the concrete Event types. The channel is closed by Close. +func (c *MessagingClient) Events() <-chan Event { return c.events }