From 8ccd9d0364f303335d8047d3c42f821f17c51528 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 16 Dec 2024 13:46:30 +0100 Subject: [PATCH] adding topic health channel --- waku/nwaku.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index cb72564..c1cf92f 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -328,6 +328,7 @@ import ( const requestTimeout = 30 * time.Second const MsgChanBufferSize = 100 +const TopicHealthChanBufferSize = 100 type WakuConfig struct { Host string `json:"host,omitempty"` @@ -453,10 +454,11 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { - wakuCtx unsafe.Pointer - logger *zap.Logger - cancel context.CancelFunc - MsgChan chan common.Envelope + wakuCtx unsafe.Pointer + logger *zap.Logger + cancel context.CancelFunc + MsgChan chan common.Envelope + TopicHealthChan chan topicHealth } func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { @@ -501,6 +503,7 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (* wg.Add(1) n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) + n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) n.logger = logger.Named("nwaku") wg.Wait() @@ -583,6 +586,11 @@ type jsonEvent struct { EventType string `json:"eventType"` } +type topicHealth struct { + PubsubTopic string `json:"pubsubTopic"` + TopicHealth string `json:"topicHealth"` +} + func (n *WakuNode) OnEvent(eventStr string) { fmt.Println("---------- GABRIEL received event: ", eventStr) jsonEvent := jsonEvent{} @@ -597,6 +605,7 @@ func (n *WakuNode) OnEvent(eventStr string) { n.parseMessageEvent(eventStr) case "relay_topic_health_change": fmt.Println("Received topic health change event") + n.parseTopicHealthChangeEvent(eventStr) } } @@ -608,6 +617,19 @@ func (n *WakuNode) parseMessageEvent(eventStr string) { n.MsgChan <- envelope } +func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { + + topicHealth := topicHealth{} + err := json.Unmarshal([]byte(eventStr), &topicHealth) + if err != nil { + n.logger.Error("could not parse topic health change", zap.Error(err)) + } + + fmt.Println("-------- topicHealth.PubsubTopic: ", topicHealth.PubsubTopic) + fmt.Println("-------- topicHealth.TopicHealth: ", topicHealth.TopicHealth) + n.TopicHealthChan <- topicHealth +} + func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 {