fix: mark subscriptions as closed

This commit is contained in:
Richard Ramos 2021-04-04 13:05:33 -04:00
parent 3ece0901b8
commit 56346c6b1a
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
3 changed files with 45 additions and 27 deletions

View File

@ -36,17 +36,6 @@ const DefaultWakuTopic Topic = "/waku/2/default-waku/proto"
type Message []byte
type WakuInfo struct {
// NOTE One for simplicity, can extend later as needed
listenStr string
multiaddrStrings []byte
}
type MessagePair struct {
a *Topic
b *protocol.WakuMessage
}
type Subscription struct {
C chan *common.Envelope
closed bool
@ -113,10 +102,8 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, op
w.ctx = ctx
w.topics = make(map[Topic]*wakurelay.Topic)
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty()))
for _, addr := range host.Addrs() {
fullAddr := addr.Encapsulate(hostInfo)
log.Info("Listening on", fullAddr)
for _, addr := range w.ListenAddresses() {
log.Info("Listening on", addr)
}
return w, nil
@ -142,6 +129,15 @@ func (w *WakuNode) ID() string {
return w.host.ID().Pretty()
}
func (w *WakuNode) ListenAddresses() []string {
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty()))
var result []string
for _, addr := range w.host.Addrs() {
result = append(result, addr.Encapsulate(hostInfo).String())
}
return result
}
func (w *WakuNode) PubSub() *wakurelay.PubSub {
return w.pubsub
}
@ -261,8 +257,12 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
case <-nextMsgTicker.C:
msg, err := sub.Next(ctx)
if err != nil {
log.Error("error receiving message", err)
close(subscription.quit)
subscription.mutex.Lock()
defer subscription.mutex.Unlock()
if !subscription.closed {
subscription.closed = true
close(subscription.quit)
}
return
}
@ -291,6 +291,7 @@ func (subs *Subscription) Unsubscribe() {
subs.mutex.Lock()
defer subs.mutex.Unlock()
if !subs.closed {
subs.closed = true
close(subs.quit)
}
}

View File

@ -1,12 +1,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0-devel
// protoc-gen-go v1.25.0
// protoc v3.14.0
// source: waku_message.proto
package protocol
import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@ -20,15 +21,20 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type WakuMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"`
ContentTopic *uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"`
Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"`
Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3,oneof" json:"proof,omitempty"`
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"`
ContentTopic *uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"`
Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"`
Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3,oneof" json:"proof,omitempty"`
Timestamp *float64 `protobuf:"fixed64,5,opt,name=timestamp,proto3,oneof" json:"timestamp,omitempty"`
}
func (x *WakuMessage) Reset() {
@ -91,11 +97,18 @@ func (x *WakuMessage) GetProof() []byte {
return nil
}
func (x *WakuMessage) GetTimestamp() float64 {
if x != nil && x.Timestamp != nil {
return *x.Timestamp
}
return 0
}
var File_waku_message_proto protoreflect.FileDescriptor
var file_waku_message_proto_rawDesc = []byte{
0x0a, 0x12, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0xc2,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0xf3,
0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d,
0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48,
0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a,
@ -105,10 +118,13 @@ var file_waku_message_proto_rawDesc = []byte{
0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x02, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x19, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x04,
0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x88, 0x01, 0x01,
0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x0f, 0x0a, 0x0d,
0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0a, 0x0a,
0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x70, 0x72,
0x6f, 0x6f, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x12, 0x21, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20,
0x01, 0x28, 0x01, 0x48, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70,
0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42,
0x0f, 0x0a, 0x0d, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63,
0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06,
0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -7,4 +7,5 @@ message WakuMessage {
optional uint32 contentTopic = 2;
optional uint32 version = 3;
optional bytes proof = 4;
optional double timestamp = 5;
}