From 56346c6b1a14c3d8d3d2d6631f9442e493545aac Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sun, 4 Apr 2021 13:05:33 -0400 Subject: [PATCH] fix: mark subscriptions as closed --- waku/v2/node/wakunode2.go | 35 ++++++++++++++-------------- waku/v2/protocol/waku_message.pb.go | 36 +++++++++++++++++++++-------- waku/v2/protocol/waku_message.proto | 1 + 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 0c60752a..9c3a1f12 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -36,17 +36,6 @@ const DefaultWakuTopic Topic = "/waku/2/default-waku/proto" type Message []byte -type WakuInfo struct { - // NOTE One for simplicity, can extend later as needed - listenStr string - multiaddrStrings []byte -} - -type MessagePair struct { - a *Topic - b *protocol.WakuMessage -} - type Subscription struct { C chan *common.Envelope closed bool @@ -113,10 +102,8 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, op w.ctx = ctx w.topics = make(map[Topic]*wakurelay.Topic) - hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty())) - for _, addr := range host.Addrs() { - fullAddr := addr.Encapsulate(hostInfo) - log.Info("Listening on", fullAddr) + for _, addr := range w.ListenAddresses() { + log.Info("Listening on", addr) } return w, nil @@ -142,6 +129,15 @@ func (w *WakuNode) ID() string { return w.host.ID().Pretty() } +func (w *WakuNode) ListenAddresses() []string { + hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty())) + var result []string + for _, addr := range w.host.Addrs() { + result = append(result, addr.Encapsulate(hostInfo).String()) + } + return result +} + func (w *WakuNode) PubSub() *wakurelay.PubSub { return w.pubsub } @@ -261,8 +257,12 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { case <-nextMsgTicker.C: msg, err := sub.Next(ctx) if err != nil { - log.Error("error receiving message", err) - close(subscription.quit) + subscription.mutex.Lock() + defer subscription.mutex.Unlock() + if !subscription.closed { + subscription.closed = true + close(subscription.quit) + } return } @@ -291,6 +291,7 @@ func (subs *Subscription) Unsubscribe() { subs.mutex.Lock() defer subs.mutex.Unlock() if !subs.closed { + subs.closed = true close(subs.quit) } } diff --git a/waku/v2/protocol/waku_message.pb.go b/waku/v2/protocol/waku_message.pb.go index 7b966641..726867f0 100644 --- a/waku/v2/protocol/waku_message.pb.go +++ b/waku/v2/protocol/waku_message.pb.go @@ -1,12 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0-devel +// protoc-gen-go v1.25.0 // protoc v3.14.0 // source: waku_message.proto package protocol import ( + proto "github.com/golang/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -20,15 +21,20 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + type WakuMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"` - ContentTopic *uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"` - Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"` - Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3,oneof" json:"proof,omitempty"` + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"` + ContentTopic *uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"` + Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"` + Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3,oneof" json:"proof,omitempty"` + Timestamp *float64 `protobuf:"fixed64,5,opt,name=timestamp,proto3,oneof" json:"timestamp,omitempty"` } func (x *WakuMessage) Reset() { @@ -91,11 +97,18 @@ func (x *WakuMessage) GetProof() []byte { return nil } +func (x *WakuMessage) GetTimestamp() float64 { + if x != nil && x.Timestamp != nil { + return *x.Timestamp + } + return 0 +} + var File_waku_message_proto protoreflect.FileDescriptor var file_waku_message_proto_rawDesc = []byte{ 0x0a, 0x12, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0xc2, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0xf3, 0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a, @@ -105,10 +118,13 @@ var file_waku_message_proto_rawDesc = []byte{ 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x02, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x19, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x88, 0x01, 0x01, - 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x0f, 0x0a, 0x0d, - 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0a, 0x0a, - 0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x70, 0x72, - 0x6f, 0x6f, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x21, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x01, 0x48, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, + 0x0f, 0x0a, 0x0d, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, + 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06, + 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/waku/v2/protocol/waku_message.proto b/waku/v2/protocol/waku_message.proto index 157561b6..1e5b5ab5 100644 --- a/waku/v2/protocol/waku_message.proto +++ b/waku/v2/protocol/waku_message.proto @@ -7,4 +7,5 @@ message WakuMessage { optional uint32 contentTopic = 2; optional uint32 version = 3; optional bytes proof = 4; + optional double timestamp = 5; } \ No newline at end of file