From 8193867e4419d52be68d9307da3d11bb8e962719 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Tue, 8 Apr 2025 14:14:22 +0300 Subject: [PATCH 1/2] chore: improving type namings (#61) --- waku/common/envelope.go | 32 ++++++++++++++++---------------- waku/common/store.go | 6 +++--- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/waku/common/envelope.go b/waku/common/envelope.go index 27d6c55..a27fc68 100644 --- a/waku/common/envelope.go +++ b/waku/common/envelope.go @@ -15,7 +15,7 @@ type Envelope struct { hash MessageHash } -type tmpWakuMessageJson struct { +type wakuMessage struct { Payload []byte `json:"payload,omitempty"` ContentTopic string `json:"contentTopic,omitempty"` Version *uint32 `json:"version,omitempty"` @@ -25,32 +25,32 @@ type tmpWakuMessageJson struct { RateLimitProof []byte `json:"proof,omitempty"` } -type tmpEnvelopeStruct struct { - WakuMessage tmpWakuMessageJson `json:"wakuMessage"` - PubsubTopic string `json:"pubsubTopic"` - MessageHash MessageHash `json:"messageHash"` +type wakuEnvelope struct { + WakuMessage wakuMessage `json:"wakuMessage"` + PubsubTopic string `json:"pubsubTopic"` + MessageHash MessageHash `json:"messageHash"` } // NewEnvelope creates a new Envelope from a json string generated in nwaku func NewEnvelope(jsonEventStr string) (*Envelope, error) { - tmpEnvelopeStruct := tmpEnvelopeStruct{} - err := json.Unmarshal([]byte(jsonEventStr), &tmpEnvelopeStruct) + wakuEnvelope := wakuEnvelope{} + err := json.Unmarshal([]byte(jsonEventStr), &wakuEnvelope) if err != nil { return nil, err } return &Envelope{ msg: &pb.WakuMessage{ - Payload: tmpEnvelopeStruct.WakuMessage.Payload, - ContentTopic: tmpEnvelopeStruct.WakuMessage.ContentTopic, - Version: tmpEnvelopeStruct.WakuMessage.Version, - Timestamp: tmpEnvelopeStruct.WakuMessage.Timestamp, - Meta: tmpEnvelopeStruct.WakuMessage.Meta, - Ephemeral: tmpEnvelopeStruct.WakuMessage.Ephemeral, - RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof, + Payload: wakuEnvelope.WakuMessage.Payload, + ContentTopic: wakuEnvelope.WakuMessage.ContentTopic, + Version: wakuEnvelope.WakuMessage.Version, + Timestamp: wakuEnvelope.WakuMessage.Timestamp, + Meta: wakuEnvelope.WakuMessage.Meta, + Ephemeral: wakuEnvelope.WakuMessage.Ephemeral, + RateLimitProof: wakuEnvelope.WakuMessage.RateLimitProof, }, - topic: tmpEnvelopeStruct.PubsubTopic, - hash: tmpEnvelopeStruct.MessageHash, + topic: wakuEnvelope.PubsubTopic, + hash: wakuEnvelope.MessageHash, }, nil } diff --git a/waku/common/store.go b/waku/common/store.go index 32e76ad..0ff9d4b 100644 --- a/waku/common/store.go +++ b/waku/common/store.go @@ -14,9 +14,9 @@ type StoreQueryRequest struct { } type StoreMessageResponse struct { - WakuMessage *tmpWakuMessageJson `json:"message"` - PubsubTopic string `json:"pubsubTopic"` - MessageHash MessageHash `json:"messageHash"` + WakuMessage *wakuMessage `json:"message"` + PubsubTopic string `json:"pubsubTopic"` + MessageHash MessageHash `json:"messageHash"` } type StoreQueryResponse struct { From 6d7387e88e6effa63b42798fcb97563641f1df51 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Tue, 8 Apr 2025 17:02:37 +0300 Subject: [PATCH 2/2] fix: avoid blocking nodes when channels are full (#63) --- waku/nwaku.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index aa1e464..f1e2c48 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -484,7 +484,11 @@ func (n *WakuNode) parseMessageEvent(eventStr string) { if err != nil { Error("could not parse message %v", err) } - n.MsgChan <- *envelope + select { + case n.MsgChan <- *envelope: + default: + Warn("Can't deliver message to subscription, MsgChan is full") + } } func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { @@ -494,7 +498,12 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { if err != nil { Error("could not parse topic health change %v", err) } - n.TopicHealthChan <- topicHealth + + select { + case n.TopicHealthChan <- topicHealth: + default: + Warn("Can't deliver topic health event, TopicHealthChan is full") + } } func (n *WakuNode) parseConnectionChangeEvent(eventStr string) { @@ -504,7 +513,12 @@ func (n *WakuNode) parseConnectionChangeEvent(eventStr string) { if err != nil { Error("could not parse connection change %v", err) } - n.ConnectionChangeChan <- connectionChange + + select { + case n.ConnectionChangeChan <- connectionChange: + default: + Warn("Can't deliver connection change event, ConnectionChangeChan is full") + } } func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {