diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index a66c4f24..dbfd7dd2 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -30,6 +30,7 @@ type Chat struct { self peer.ID contentTopic string useV1Payload bool + useLightPush bool nick string } @@ -44,6 +45,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic contentTopic: contentTopic, nick: nickname, useV1Payload: useV1Payload, + useLightPush: useLightPush, Messages: make(chan *pb.Chat2Message, 1024), } @@ -58,7 +60,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic return nil, err } } else { - sub, err := n.Subscribe(ctx, nil) + sub, err := n.Relay().Subscribe(ctx, nil) if err != nil { return nil, err } @@ -120,7 +122,13 @@ func (cr *Chat) Publish(ctx context.Context, message string) error { Timestamp: timestamp, } - _, err = cr.node.Publish(ctx, wakuMsg, nil) + if cr.useLightPush { + _, err = cr.node.Lightpush().Publish(ctx, wakuMsg, nil) + + } else { + _, err = cr.node.Relay().Publish(ctx, wakuMsg, nil) + + } return err } diff --git a/tests/connection_test.go b/tests/connection_test.go index 5a9403f9..873bf9a1 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -40,7 +40,7 @@ func TestBasicSendingReceiving(t *testing.T) { require.NoError(t, write(ctx, wakuNode, "test")) - sub, err := wakuNode.Subscribe(ctx, nil) + sub, err := wakuNode.Relay().Subscribe(ctx, nil) require.NoError(t, err) value := <-sub.C @@ -79,6 +79,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro Timestamp: timestamp, } - _, err = wakuNode.Publish(ctx, msg, nil) + _, err = wakuNode.Relay().Publish(ctx, msg, nil) return err } diff --git a/waku/node.go b/waku/node.go index f78e1fa3..d73f4212 100644 --- a/waku/node.go +++ b/waku/node.go @@ -181,7 +181,7 @@ func Execute(options Options) { if !options.Relay.Disable { for _, t := range options.Relay.Topics { nodeTopic := relay.Topic(t) - _, err := wakuNode.Subscribe(ctx, &nodeTopic) + _, err := wakuNode.Relay().Subscribe(ctx, &nodeTopic) failOnErr(err, "Error subscring to topic") } } diff --git a/waku/v2/node/broadcast.go b/waku/v2/broadcast.go similarity index 96% rename from waku/v2/node/broadcast.go rename to waku/v2/broadcast.go index a6cfac9c..88d99efb 100644 --- a/waku/v2/node/broadcast.go +++ b/waku/v2/broadcast.go @@ -1,4 +1,4 @@ -package node +package v2 import ( "github.com/status-im/go-waku/waku/v2/protocol" @@ -23,7 +23,7 @@ type Broadcaster interface { // Unregister a channel so that it no longer receives broadcasts. Unregister(chan<- *protocol.Envelope) // Shut this broadcaster down. - Close() error + Close() // Submit a new object to all subscribers Submit(*protocol.Envelope) } @@ -78,9 +78,8 @@ func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) { } // Closes the broadcaster. Used to stop receiving new subscribers -func (b *broadcaster) Close() error { +func (b *broadcaster) Close() { close(b.reg) - return nil } // Submits an Envelope to be broadcasted among all registered subscriber channels diff --git a/waku/v2/node/broadcast_test.go b/waku/v2/broadcast_test.go similarity index 88% rename from waku/v2/node/broadcast_test.go rename to waku/v2/broadcast_test.go index 3ed35261..aba0139e 100644 --- a/waku/v2/node/broadcast_test.go +++ b/waku/v2/broadcast_test.go @@ -1,11 +1,10 @@ -package node +package v2 import ( "sync" "testing" "github.com/status-im/go-waku/waku/v2/protocol" - "github.com/stretchr/testify/require" ) // Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 @@ -41,6 +40,5 @@ func TestBroadcast(t *testing.T) { func TestBroadcastCleanup(t *testing.T) { b := NewBroadcaster(100) b.Register(make(chan *protocol.Envelope)) - err := b.Close() - require.NoError(t, err) + b.Close() } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 608ea635..70bb4813 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -4,10 +4,8 @@ import ( "context" "errors" "fmt" - "sync" "time" - proto "github.com/golang/protobuf/proto" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" @@ -20,9 +18,9 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats" - "go.opencensus.io/tag" rendezvous "github.com/status-im/go-waku-rendezvous" + v2 "github.com/status-im/go-waku/waku/v2" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/filter" @@ -48,10 +46,7 @@ type WakuNode struct { ping *ping.PingService store *store.WakuStore - subscriptions map[relay.Topic][]*Subscription - subscriptionsMutex sync.Mutex - - bcaster Broadcaster + bcaster v2.Broadcaster filters filter.Filters @@ -102,11 +97,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } w := new(WakuNode) - w.bcaster = NewBroadcaster(1024) + w.bcaster = v2.NewBroadcaster(1024) w.host = host w.cancel = cancel w.ctx = ctx - w.subscriptions = make(map[relay.Topic][]*Subscription) w.opts = params w.quit = make(chan struct{}) @@ -156,11 +150,9 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...)) } - if w.opts.enableRelay { - err := w.mountRelay(w.opts.wOpts...) - if err != nil { - return err - } + err := w.mountRelay(w.opts.wOpts...) + if err != nil { + return err } w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay) @@ -177,16 +169,27 @@ func (w *WakuNode) Start() error { } } + // Subscribe store to topic + if w.opts.storeMsgs { + log.Info("Subscribing store to broadcaster") + w.bcaster.Register(w.store.MsgC) + } + + if w.filter != nil { + log.Info("Subscribing filter to broadcaster") + w.bcaster.Register(w.filter.MsgC) + } + return nil } func (w *WakuNode) Stop() { - w.subscriptionsMutex.Lock() - defer w.subscriptionsMutex.Unlock() defer w.cancel() close(w.quit) + w.bcaster.Close() + defer w.connectionNotif.Close() defer w.protocolEventSub.Close() defer w.identificationEventSub.Close() @@ -195,15 +198,6 @@ func (w *WakuNode) Stop() { w.rendezvous.Stop() } - if w.relay != nil { - for _, topic := range w.relay.Topics() { - for _, sub := range w.subscriptions[topic] { - sub.Unsubscribe() - } - } - w.subscriptions = nil - } - if w.filter != nil { w.filter.Stop() for _, filter := range w.filters { @@ -212,6 +206,7 @@ func (w *WakuNode) Stop() { w.filters = nil } + w.relay.Stop() w.lightPush.Stop() w.store.Stop() @@ -253,14 +248,16 @@ func (w *WakuNode) Lightpush() *lightpush.WakuLightPush { func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { var err error - w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) + w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...) if err != nil { return err } - _, err = w.Subscribe(w.ctx, nil) - if err != nil { - return err + if w.opts.enableRelay { + _, err = w.relay.Subscribe(w.ctx, nil) + if err != nil { + return err + } } // TODO: rlnRelay @@ -346,93 +343,6 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer. return &info.ID, w.addPeer(info, protocolID) } -func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subscription, error) { - // Subscribes to a PubSub topic. - // NOTE The data field SHOULD be decoded as a WakuMessage. - if node.relay == nil { - return nil, errors.New("WakuRelay hasn't been set") - } - - t := relay.GetTopic(topic) - sub, isNew, err := node.relay.Subscribe(t) - - // Subscribe store to topic - if isNew && node.opts.storeMsgs { - log.Info("Subscribing store to topic ", t) - node.bcaster.Register(node.store.MsgC) - } - - // Subscribe filter - if isNew && node.filter != nil { - log.Info("Subscribing filter to topic ", t) - node.bcaster.Register(node.filter.MsgC) - } - - if err != nil { - return nil, err - } - - // Create client subscription - subscription := new(Subscription) - subscription.closed = false - subscription.C = make(chan *protocol.Envelope, 1024) // To avoid blocking - subscription.quit = make(chan struct{}) - - node.subscriptionsMutex.Lock() - defer node.subscriptionsMutex.Unlock() - - node.subscriptions[t] = append(node.subscriptions[t], subscription) - - node.bcaster.Register(subscription.C) - - go node.subscribeToTopic(t, subscription, sub) - - return subscription, nil -} - -func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription, sub *pubsub.Subscription) { - nextMsgTicker := time.NewTicker(time.Millisecond * 10) - defer nextMsgTicker.Stop() - - ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay")) - if err != nil { - log.Error(err) - return - } - - for { - select { - case <-subscription.quit: - subscription.mutex.Lock() - node.bcaster.Unregister(subscription.C) // Remove from broadcast list - close(subscription.C) - subscription.mutex.Unlock() - case <-nextMsgTicker.C: - msg, err := sub.Next(ctx) - if err != nil { - subscription.mutex.Lock() - for _, subscription := range node.subscriptions[t] { - subscription.Unsubscribe() - } - subscription.mutex.Unlock() - return - } - - stats.Record(ctx, metrics.Messages.M(1)) - - wakuMessage := &pb.WakuMessage{} - if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - log.Error("could not decode message", err) - return - } - - envelope := protocol.NewEnvelope(wakuMessage, string(t)) - - node.bcaster.Submit(envelope) - } - } -} - // Wrapper around WakuFilter.Subscribe // that adds a Filter object to node.filters func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) { @@ -447,7 +357,7 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilte // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. // ContentFilterChan takes MessagePush structs subs, err := node.filter.Subscribe(ctx, f) - if subs.RequestID == "" || err != nil { + if err != nil || subs.RequestID == "" { // Failed to subscribe log.Error("remote subscription to filter failed", err) return @@ -540,22 +450,6 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFi return nil } -func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { - if message == nil { - return nil, errors.New("message can't be null") - } - - if node.relay == nil { - return nil, errors.New("WakuRelay hasn't been set") - } - - hash, err := node.relay.Publish(ctx, message, topic) - if err != nil { - return nil, err - } - return hash, nil -} - func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error { info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 8a004b5c..6ad82a23 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -9,25 +9,25 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peerstore" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/status-im/go-waku/tests" + v2 "github.com/status-im/go-waku/waku/v2" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/stretchr/testify/require" ) -func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *pubsub.Subscription, host.Host) { +func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *relay.Subscription, host.Host) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host) + relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) require.NoError(t, err) - sub, _, err := relay.Subscribe(topic) + sub, err := relay.Subscribe(context.Background(), &topic) require.NoError(t, err) return relay, sub, host @@ -48,11 +48,11 @@ func TestWakuLightPush(t *testing.T) { var testTopic relay.Topic = "/waku/2/go/lightpush/test" node1, sub1, host1 := makeWakuRelay(t, testTopic) defer node1.Stop() - defer sub1.Cancel() + defer sub1.Unsubscribe() node2, sub2, host2 := makeWakuRelay(t, testTopic) defer node2.Stop() - defer sub2.Cancel() + defer sub2.Unsubscribe() ctx := context.Background() lightPushNode2 := NewWakuLightPush(ctx, host2, node2) @@ -92,21 +92,15 @@ func TestWakuLightPush(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := sub1.Next(context.Background()) - require.NoError(t, err) - - _, err = sub1.Next(context.Background()) - require.NoError(t, err) + <-sub1.C + <-sub1.C }() wg.Add(1) go func() { defer wg.Done() - _, err := sub2.Next(context.Background()) - require.NoError(t, err) - - _, err = sub2.Next(context.Background()) - require.NoError(t, err) + <-sub2.C + <-sub2.C }() // Verifying succesful request @@ -127,10 +121,9 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil) - err = client.Start() + require.Errorf(t, err, "relay is required") } diff --git a/waku/v2/node/subscription.go b/waku/v2/protocol/relay/subscription.go similarity index 80% rename from waku/v2/node/subscription.go rename to waku/v2/protocol/relay/subscription.go index 10a46b52..4adc0a83 100644 --- a/waku/v2/node/subscription.go +++ b/waku/v2/protocol/relay/subscription.go @@ -1,4 +1,4 @@ -package node +package relay import ( "sync" @@ -12,23 +12,21 @@ type Subscription struct { C chan *protocol.Envelope closed bool - mutex sync.Mutex + once sync.Once quit chan struct{} } // Unsubscribe will close a subscription from a pubsub topic. Will close the message channel func (subs *Subscription) Unsubscribe() { - subs.mutex.Lock() - defer subs.mutex.Unlock() - if !subs.closed { - close(subs.quit) + subs.once.Do(func() { subs.closed = true - } + close(subs.quit) + close(subs.C) + + }) } // IsClosed determine whether a Subscription is still open for receiving messages func (subs *Subscription) IsClosed() bool { - subs.mutex.Lock() - defer subs.mutex.Unlock() return subs.closed } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index b5f22d56..90b86eb2 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -4,17 +4,22 @@ import ( "context" "crypto/sha256" "errors" + "fmt" "sync" proto "github.com/golang/protobuf/proto" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" - - pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/status-im/go-waku/waku/v2/protocol/pb" + "go.opencensus.io/stats" + "go.opencensus.io/tag" pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + v2 "github.com/status-im/go-waku/waku/v2" + "github.com/status-im/go-waku/waku/v2/metrics" + waku_proto "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" ) var log = logging.Logger("wakurelay") @@ -32,6 +37,11 @@ type WakuRelay struct { topicsMutex sync.Mutex wakuRelayTopics map[Topic]*pubsub.Topic relaySubs map[Topic]*pubsub.Subscription + + bcaster v2.Broadcaster + + subscriptions map[Topic][]*Subscription + subscriptionsMutex sync.Mutex } // Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn @@ -40,12 +50,14 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { return string(hash[:]) } -func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*WakuRelay, error) { +func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h w.topics = make(map[Topic]bool) w.wakuRelayTopics = make(map[Topic]*pubsub.Topic) w.relaySubs = make(map[Topic]*pubsub.Subscription) + w.subscriptions = make(map[Topic][]*Subscription) + w.bcaster = bcaster // default options required by WakuRelay opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) @@ -113,25 +125,24 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { return pubSubTopic, nil } -func (w *WakuRelay) Subscribe(topic Topic) (subs *pubsub.Subscription, isNew bool, err error) { +func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error) { sub, ok := w.relaySubs[topic] if !ok { pubSubTopic, err := w.upsertTopic(topic) if err != nil { - return nil, false, err + return nil, err } sub, err = pubSubTopic.Subscribe() if err != nil { - return nil, false, err + return nil, err } w.relaySubs[topic] = sub log.Info("Subscribing to topic ", topic) } - isNew = !ok // ok will be true if subscription already exists - return sub, isNew, nil + return sub, nil } func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) { @@ -175,4 +186,96 @@ func GetTopic(topic *Topic) Topic { func (w *WakuRelay) Stop() { w.host.RemoveStreamHandler(WakuRelayID_v200) + w.subscriptionsMutex.Lock() + defer w.subscriptionsMutex.Unlock() + + for _, topic := range w.Topics() { + for _, sub := range w.subscriptions[topic] { + sub.Unsubscribe() + } + } + w.subscriptions = nil +} + +func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, error) { + // Subscribes to a PubSub topic. + // NOTE The data field SHOULD be decoded as a WakuMessage. + t := GetTopic(topic) + sub, err := w.subscribe(t) + + if err != nil { + return nil, err + } + + // Create client subscription + subscription := new(Subscription) + subscription.closed = false + subscription.C = make(chan *waku_proto.Envelope, 1024) // To avoid blocking + subscription.quit = make(chan struct{}) + + w.subscriptionsMutex.Lock() + defer w.subscriptionsMutex.Unlock() + + w.subscriptions[t] = append(w.subscriptions[t], subscription) + + if w.bcaster != nil { + w.bcaster.Register(subscription.C) + } + + go w.subscribeToTopic(t, subscription, sub) + + return subscription, nil +} + +func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message { + msgChannel := make(chan *pubsub.Message, 1024) + go func(msgChannel chan *pubsub.Message) { + for { + msg, err := sub.Next(ctx) + if err != nil { + log.Error(fmt.Errorf("subscription failed: %w", err)) + sub.Cancel() + close(msgChannel) + for _, subscription := range w.subscriptions[Topic(sub.Topic())] { + subscription.Unsubscribe() + } + } + + msgChannel <- msg + } + }(msgChannel) + return msgChannel +} + +func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *pubsub.Subscription) { + ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay")) + if err != nil { + log.Error(err) + return + } + + subChannel := w.nextMessage(ctx, sub) + + for { + select { + case <-subscription.quit: + if w.bcaster != nil { + w.bcaster.Unregister(subscription.C) // Remove from broadcast list + } + // TODO: if there are no more relay subscriptions, close the pubsub subscription + case msg := <-subChannel: + stats.Record(ctx, metrics.Messages.M(1)) + wakuMessage := &pb.WakuMessage{} + if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { + log.Error("could not decode message", err) + return + } + + envelope := waku_proto.NewEnvelope(wakuMessage, string(t)) + + if w.bcaster != nil { + w.bcaster.Submit(envelope) + } + } + } } diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 7b57ea4a..d242ef5a 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -19,18 +19,13 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := NewWakuRelay(context.Background(), host) + relay, err := NewWakuRelay(context.Background(), host, nil) defer relay.Stop() require.NoError(t, err) - sub, isNew, err := relay.Subscribe(testTopic) + sub, err := relay.subscribe(testTopic) defer sub.Cancel() require.NoError(t, err) - require.True(t, isNew) - - _, isNew, err = relay.Subscribe(testTopic) - require.NoError(t, err) - require.False(t, isNew) topics := relay.Topics() require.Equal(t, 1, len(topics)) diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index aa583b6c..d138c0fc 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -707,6 +707,12 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList func (w *WakuStore) Stop() { w.started = false - close(w.MsgC) - w.h.RemoveStreamHandler(StoreID_v20beta3) + + if w.MsgC != nil { + close(w.MsgC) + } + + if w.h != nil { + w.h.RemoveStreamHandler(StoreID_v20beta3) + } }