From 96fb32a76e3a37530e78dee54f3fc98bcf07ef54 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:44:56 +0100 Subject: [PATCH] feat: integrate topic health change event (#10) --- waku/nwaku.go | 30 +++++++++++++++---- waku/nwaku_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 93 insertions(+), 11 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index e7632b4..8434ea0 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -340,6 +340,7 @@ import ( const requestTimeout = 30 * time.Second const MsgChanBufferSize = 100 +const TopicHealthChanBufferSize = 100 type WakuConfig struct { Host string `json:"host,omitempty"` @@ -503,10 +504,11 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { - wakuCtx unsafe.Pointer - logger *zap.Logger - cancel context.CancelFunc - MsgChan chan common.Envelope + wakuCtx unsafe.Pointer + logger *zap.Logger + cancel context.CancelFunc + MsgChan chan common.Envelope + TopicHealthChan chan topicHealth } func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { @@ -551,11 +553,13 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (* wg.Add(1) n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) + n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) n.logger = logger.Named("nwaku") wg.Wait() // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoWakuSetEventCallback(n.wakuCtx) + registerNode(n) return n, nil } @@ -632,6 +636,11 @@ type jsonEvent struct { EventType string `json:"eventType"` } +type topicHealth struct { + PubsubTopic string `json:"pubsubTopic"` + TopicHealth string `json:"topicHealth"` +} + func (n *WakuNode) OnEvent(eventStr string) { jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) @@ -643,6 +652,8 @@ func (n *WakuNode) OnEvent(eventStr string) { switch jsonEvent.EventType { case "message": n.parseMessageEvent(eventStr) + case "relay_topic_health_change": + n.parseTopicHealthChangeEvent(eventStr) } } @@ -654,6 +665,16 @@ func (n *WakuNode) parseMessageEvent(eventStr string) { n.MsgChan <- envelope } +func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { + + topicHealth := topicHealth{} + err := json.Unmarshal([]byte(eventStr), &topicHealth) + if err != nil { + n.logger.Error("could not parse topic health change", zap.Error(err)) + } + n.TopicHealthChan <- topicHealth +} + func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { @@ -1046,7 +1067,6 @@ func (n *WakuNode) Start() error { C.cGoWakuStart(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { - registerNode(n) return nil } diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 9d5ce18..4c5e6d2 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "slices" - "sync" "testing" "time" @@ -467,22 +466,85 @@ func TestRelay(t *testing.T) { defer cancel2() senderNode.RelayPublish(ctx2, message, pubsubTopic) - wg := sync.WaitGroup{} - wg.Add(1) - // Wait to receive message select { case envelope := <-receiverNode.node.MsgChan: require.NotNil(t, envelope, "Envelope should be received") require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match") require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match") - wg.Done() case <-time.After(10 * time.Second): t.Fatal("Timeout: No message received within 10 seconds") } - wg.Wait() // Stop nodes require.NoError(t, senderNode.Stop()) require.NoError(t, receiverNode.Stop()) } + +func TestTopicHealth(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + clusterId := uint16(16) + shardId := uint16(64) + + // start node1 + wakuConfig1 := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: clusterId, + Shards: []uint16{shardId}, + Discv5UdpPort: 9050, + TcpPort: 60050, + } + + node1, err := New(&wakuConfig1, logger.Named("node1")) + require.NoError(t, err) + require.NoError(t, node1.Start()) + time.Sleep(1 * time.Second) + + // start node2 + wakuConfig2 := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: clusterId, + Shards: []uint16{shardId}, + Discv5UdpPort: 9051, + TcpPort: 60051, + } + node2, err := New(&wakuConfig2, logger.Named("node2")) + require.NoError(t, err) + require.NoError(t, node2.Start()) + time.Sleep(1 * time.Second) + multiaddr2, err := node2.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, multiaddr2) + + // node1 dials node2 so they become peers + err = node1.DialPeer(multiaddr2[0]) + require.NoError(t, err) + time.Sleep(1 * time.Second) + // Check that both nodes now have one connected peer + peerCount1, err := node1.PeerCount() + require.NoError(t, err) + require.True(t, peerCount1 == 1, "node1 should have 1 peer") + peerCount2, err := node2.PeerCount() + require.NoError(t, err) + require.True(t, peerCount2 == 1, "node2 should have 1 peer") + + // Wait to receive topic health update + select { + case topicHealth := <-node2.node.TopicHealthChan: + require.NotNil(t, topicHealth, "topicHealth should be updated") + require.Equal(t, topicHealth.TopicHealth, "MinimallyHealthy", "Topic health should be MinimallyHealthy") + require.Equal(t, topicHealth.PubsubTopic, FormatWakuRelayTopic(clusterId, shardId), "PubsubTopic should match configured cluster and shard") + case <-time.After(10 * time.Second): + t.Fatal("Timeout: No topic health event received within 10 seconds") + } + + // Stop nodes + require.NoError(t, node1.Stop()) + require.NoError(t, node2.Stop()) + +}