feat: integrate topic health change event (#10)

This commit is contained in:
gabrielmer 2025-01-07 15:44:56 +01:00 committed by GitHub
parent 9bb0e2656f
commit 96fb32a76e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 93 additions and 11 deletions

View File

@ -340,6 +340,7 @@ import (
const requestTimeout = 30 * time.Second const requestTimeout = 30 * time.Second
const MsgChanBufferSize = 100 const MsgChanBufferSize = 100
const TopicHealthChanBufferSize = 100
type WakuConfig struct { type WakuConfig struct {
Host string `json:"host,omitempty"` Host string `json:"host,omitempty"`
@ -503,10 +504,11 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
// WakuNode represents an instance of an nwaku node // WakuNode represents an instance of an nwaku node
type WakuNode struct { type WakuNode struct {
wakuCtx unsafe.Pointer wakuCtx unsafe.Pointer
logger *zap.Logger logger *zap.Logger
cancel context.CancelFunc cancel context.CancelFunc
MsgChan chan common.Envelope MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
} }
func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
@ -551,11 +553,13 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*
wg.Add(1) wg.Add(1)
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) n.MsgChan = make(chan common.Envelope, MsgChanBufferSize)
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
n.logger = logger.Named("nwaku") n.logger = logger.Named("nwaku")
wg.Wait() wg.Wait()
// Notice that the events for self node are handled by the 'MyEventCallback' method // Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx) C.cGoWakuSetEventCallback(n.wakuCtx)
registerNode(n)
return n, nil return n, nil
} }
@ -632,6 +636,11 @@ type jsonEvent struct {
EventType string `json:"eventType"` EventType string `json:"eventType"`
} }
type topicHealth struct {
PubsubTopic string `json:"pubsubTopic"`
TopicHealth string `json:"topicHealth"`
}
func (n *WakuNode) OnEvent(eventStr string) { func (n *WakuNode) OnEvent(eventStr string) {
jsonEvent := jsonEvent{} jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent) err := json.Unmarshal([]byte(eventStr), &jsonEvent)
@ -643,6 +652,8 @@ func (n *WakuNode) OnEvent(eventStr string) {
switch jsonEvent.EventType { switch jsonEvent.EventType {
case "message": case "message":
n.parseMessageEvent(eventStr) n.parseMessageEvent(eventStr)
case "relay_topic_health_change":
n.parseTopicHealthChangeEvent(eventStr)
} }
} }
@ -654,6 +665,16 @@ func (n *WakuNode) parseMessageEvent(eventStr string) {
n.MsgChan <- envelope n.MsgChan <- envelope
} }
func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) {
topicHealth := topicHealth{}
err := json.Unmarshal([]byte(eventStr), &topicHealth)
if err != nil {
n.logger.Error("could not parse topic health change", zap.Error(err))
}
n.TopicHealthChan <- topicHealth
}
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string var pubsubTopic string
if len(optPubsubTopic) == 0 { if len(optPubsubTopic) == 0 {
@ -1046,7 +1067,6 @@ func (n *WakuNode) Start() error {
C.cGoWakuStart(n.wakuCtx, resp) C.cGoWakuStart(n.wakuCtx, resp)
wg.Wait() wg.Wait()
if C.getRet(resp) == C.RET_OK { if C.getRet(resp) == C.RET_OK {
registerNode(n)
return nil return nil
} }

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"slices" "slices"
"sync"
"testing" "testing"
"time" "time"
@ -467,22 +466,85 @@ func TestRelay(t *testing.T) {
defer cancel2() defer cancel2()
senderNode.RelayPublish(ctx2, message, pubsubTopic) senderNode.RelayPublish(ctx2, message, pubsubTopic)
wg := sync.WaitGroup{}
wg.Add(1)
// Wait to receive message // Wait to receive message
select { select {
case envelope := <-receiverNode.node.MsgChan: case envelope := <-receiverNode.node.MsgChan:
require.NotNil(t, envelope, "Envelope should be received") require.NotNil(t, envelope, "Envelope should be received")
require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match") require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match")
require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match") require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match")
wg.Done()
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Fatal("Timeout: No message received within 10 seconds") t.Fatal("Timeout: No message received within 10 seconds")
} }
wg.Wait()
// Stop nodes // Stop nodes
require.NoError(t, senderNode.Stop()) require.NoError(t, senderNode.Stop())
require.NoError(t, receiverNode.Stop()) require.NoError(t, receiverNode.Stop())
} }
func TestTopicHealth(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
clusterId := uint16(16)
shardId := uint16(64)
// start node1
wakuConfig1 := WakuConfig{
Relay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: clusterId,
Shards: []uint16{shardId},
Discv5UdpPort: 9050,
TcpPort: 60050,
}
node1, err := New(&wakuConfig1, logger.Named("node1"))
require.NoError(t, err)
require.NoError(t, node1.Start())
time.Sleep(1 * time.Second)
// start node2
wakuConfig2 := WakuConfig{
Relay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: clusterId,
Shards: []uint16{shardId},
Discv5UdpPort: 9051,
TcpPort: 60051,
}
node2, err := New(&wakuConfig2, logger.Named("node2"))
require.NoError(t, err)
require.NoError(t, node2.Start())
time.Sleep(1 * time.Second)
multiaddr2, err := node2.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, multiaddr2)
// node1 dials node2 so they become peers
err = node1.DialPeer(multiaddr2[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
peerCount1, err := node1.PeerCount()
require.NoError(t, err)
require.True(t, peerCount1 == 1, "node1 should have 1 peer")
peerCount2, err := node2.PeerCount()
require.NoError(t, err)
require.True(t, peerCount2 == 1, "node2 should have 1 peer")
// Wait to receive topic health update
select {
case topicHealth := <-node2.node.TopicHealthChan:
require.NotNil(t, topicHealth, "topicHealth should be updated")
require.Equal(t, topicHealth.TopicHealth, "MinimallyHealthy", "Topic health should be MinimallyHealthy")
require.Equal(t, topicHealth.PubsubTopic, FormatWakuRelayTopic(clusterId, shardId), "PubsubTopic should match configured cluster and shard")
case <-time.After(10 * time.Second):
t.Fatal("Timeout: No topic health event received within 10 seconds")
}
// Stop nodes
require.NoError(t, node1.Stop())
require.NoError(t, node2.Stop())
}