refactor: relay (#119)

This commit is contained in:
Richard Ramos 2021-11-01 10:42:55 -04:00 committed by GitHub
parent c0ba800af7
commit b789d9900e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 185 additions and 191 deletions

View File

@ -30,6 +30,7 @@ type Chat struct {
self peer.ID self peer.ID
contentTopic string contentTopic string
useV1Payload bool useV1Payload bool
useLightPush bool
nick string nick string
} }
@ -44,6 +45,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic
contentTopic: contentTopic, contentTopic: contentTopic,
nick: nickname, nick: nickname,
useV1Payload: useV1Payload, useV1Payload: useV1Payload,
useLightPush: useLightPush,
Messages: make(chan *pb.Chat2Message, 1024), Messages: make(chan *pb.Chat2Message, 1024),
} }
@ -58,7 +60,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic
return nil, err return nil, err
} }
} else { } else {
sub, err := n.Subscribe(ctx, nil) sub, err := n.Relay().Subscribe(ctx, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -120,7 +122,13 @@ func (cr *Chat) Publish(ctx context.Context, message string) error {
Timestamp: timestamp, Timestamp: timestamp,
} }
_, err = cr.node.Publish(ctx, wakuMsg, nil) if cr.useLightPush {
_, err = cr.node.Lightpush().Publish(ctx, wakuMsg, nil)
} else {
_, err = cr.node.Relay().Publish(ctx, wakuMsg, nil)
}
return err return err
} }

View File

@ -40,7 +40,7 @@ func TestBasicSendingReceiving(t *testing.T) {
require.NoError(t, write(ctx, wakuNode, "test")) require.NoError(t, write(ctx, wakuNode, "test"))
sub, err := wakuNode.Subscribe(ctx, nil) sub, err := wakuNode.Relay().Subscribe(ctx, nil)
require.NoError(t, err) require.NoError(t, err)
value := <-sub.C value := <-sub.C
@ -79,6 +79,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro
Timestamp: timestamp, Timestamp: timestamp,
} }
_, err = wakuNode.Publish(ctx, msg, nil) _, err = wakuNode.Relay().Publish(ctx, msg, nil)
return err return err
} }

View File

@ -181,7 +181,7 @@ func Execute(options Options) {
if !options.Relay.Disable { if !options.Relay.Disable {
for _, t := range options.Relay.Topics { for _, t := range options.Relay.Topics {
nodeTopic := relay.Topic(t) nodeTopic := relay.Topic(t)
_, err := wakuNode.Subscribe(ctx, &nodeTopic) _, err := wakuNode.Relay().Subscribe(ctx, &nodeTopic)
failOnErr(err, "Error subscring to topic") failOnErr(err, "Error subscring to topic")
} }
} }

View File

@ -1,4 +1,4 @@
package node package v2
import ( import (
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
@ -23,7 +23,7 @@ type Broadcaster interface {
// Unregister a channel so that it no longer receives broadcasts. // Unregister a channel so that it no longer receives broadcasts.
Unregister(chan<- *protocol.Envelope) Unregister(chan<- *protocol.Envelope)
// Shut this broadcaster down. // Shut this broadcaster down.
Close() error Close()
// Submit a new object to all subscribers // Submit a new object to all subscribers
Submit(*protocol.Envelope) Submit(*protocol.Envelope)
} }
@ -78,9 +78,8 @@ func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) {
} }
// Closes the broadcaster. Used to stop receiving new subscribers // Closes the broadcaster. Used to stop receiving new subscribers
func (b *broadcaster) Close() error { func (b *broadcaster) Close() {
close(b.reg) close(b.reg)
return nil
} }
// Submits an Envelope to be broadcasted among all registered subscriber channels // Submits an Envelope to be broadcasted among all registered subscriber channels

View File

@ -1,11 +1,10 @@
package node package v2
import ( import (
"sync" "sync"
"testing" "testing"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/stretchr/testify/require"
) )
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 // Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120
@ -41,6 +40,5 @@ func TestBroadcast(t *testing.T) {
func TestBroadcastCleanup(t *testing.T) { func TestBroadcastCleanup(t *testing.T) {
b := NewBroadcaster(100) b := NewBroadcaster(100)
b.Register(make(chan *protocol.Envelope)) b.Register(make(chan *protocol.Envelope))
err := b.Close() b.Close()
require.NoError(t, err)
} }

View File

@ -4,10 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
@ -20,9 +18,9 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag"
rendezvous "github.com/status-im/go-waku-rendezvous" rendezvous "github.com/status-im/go-waku-rendezvous"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/filter"
@ -48,10 +46,7 @@ type WakuNode struct {
ping *ping.PingService ping *ping.PingService
store *store.WakuStore store *store.WakuStore
subscriptions map[relay.Topic][]*Subscription bcaster v2.Broadcaster
subscriptionsMutex sync.Mutex
bcaster Broadcaster
filters filter.Filters filters filter.Filters
@ -102,11 +97,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
} }
w := new(WakuNode) w := new(WakuNode)
w.bcaster = NewBroadcaster(1024) w.bcaster = v2.NewBroadcaster(1024)
w.host = host w.host = host
w.cancel = cancel w.cancel = cancel
w.ctx = ctx w.ctx = ctx
w.subscriptions = make(map[relay.Topic][]*Subscription)
w.opts = params w.opts = params
w.quit = make(chan struct{}) w.quit = make(chan struct{})
@ -156,12 +150,10 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...)) w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...))
} }
if w.opts.enableRelay {
err := w.mountRelay(w.opts.wOpts...) err := w.mountRelay(w.opts.wOpts...)
if err != nil { if err != nil {
return err return err
} }
}
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay) w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
if w.opts.enableLightPush { if w.opts.enableLightPush {
@ -177,16 +169,27 @@ func (w *WakuNode) Start() error {
} }
} }
// Subscribe store to topic
if w.opts.storeMsgs {
log.Info("Subscribing store to broadcaster")
w.bcaster.Register(w.store.MsgC)
}
if w.filter != nil {
log.Info("Subscribing filter to broadcaster")
w.bcaster.Register(w.filter.MsgC)
}
return nil return nil
} }
func (w *WakuNode) Stop() { func (w *WakuNode) Stop() {
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
defer w.cancel() defer w.cancel()
close(w.quit) close(w.quit)
w.bcaster.Close()
defer w.connectionNotif.Close() defer w.connectionNotif.Close()
defer w.protocolEventSub.Close() defer w.protocolEventSub.Close()
defer w.identificationEventSub.Close() defer w.identificationEventSub.Close()
@ -195,15 +198,6 @@ func (w *WakuNode) Stop() {
w.rendezvous.Stop() w.rendezvous.Stop()
} }
if w.relay != nil {
for _, topic := range w.relay.Topics() {
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
}
w.subscriptions = nil
}
if w.filter != nil { if w.filter != nil {
w.filter.Stop() w.filter.Stop()
for _, filter := range w.filters { for _, filter := range w.filters {
@ -212,6 +206,7 @@ func (w *WakuNode) Stop() {
w.filters = nil w.filters = nil
} }
w.relay.Stop()
w.lightPush.Stop() w.lightPush.Stop()
w.store.Stop() w.store.Stop()
@ -253,15 +248,17 @@ func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
var err error var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...)
if err != nil { if err != nil {
return err return err
} }
_, err = w.Subscribe(w.ctx, nil) if w.opts.enableRelay {
_, err = w.relay.Subscribe(w.ctx, nil)
if err != nil { if err != nil {
return err return err
} }
}
// TODO: rlnRelay // TODO: rlnRelay
@ -346,93 +343,6 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.
return &info.ID, w.addPeer(info, protocolID) return &info.ID, w.addPeer(info, protocolID)
} }
func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set")
}
t := relay.GetTopic(topic)
sub, isNew, err := node.relay.Subscribe(t)
// Subscribe store to topic
if isNew && node.opts.storeMsgs {
log.Info("Subscribing store to topic ", t)
node.bcaster.Register(node.store.MsgC)
}
// Subscribe filter
if isNew && node.filter != nil {
log.Info("Subscribing filter to topic ", t)
node.bcaster.Register(node.filter.MsgC)
}
if err != nil {
return nil, err
}
// Create client subscription
subscription := new(Subscription)
subscription.closed = false
subscription.C = make(chan *protocol.Envelope, 1024) // To avoid blocking
subscription.quit = make(chan struct{})
node.subscriptionsMutex.Lock()
defer node.subscriptionsMutex.Unlock()
node.subscriptions[t] = append(node.subscriptions[t], subscription)
node.bcaster.Register(subscription.C)
go node.subscribeToTopic(t, subscription, sub)
return subscription, nil
}
func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription, sub *pubsub.Subscription) {
nextMsgTicker := time.NewTicker(time.Millisecond * 10)
defer nextMsgTicker.Stop()
ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay"))
if err != nil {
log.Error(err)
return
}
for {
select {
case <-subscription.quit:
subscription.mutex.Lock()
node.bcaster.Unregister(subscription.C) // Remove from broadcast list
close(subscription.C)
subscription.mutex.Unlock()
case <-nextMsgTicker.C:
msg, err := sub.Next(ctx)
if err != nil {
subscription.mutex.Lock()
for _, subscription := range node.subscriptions[t] {
subscription.Unsubscribe()
}
subscription.mutex.Unlock()
return
}
stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
log.Error("could not decode message", err)
return
}
envelope := protocol.NewEnvelope(wakuMessage, string(t))
node.bcaster.Submit(envelope)
}
}
}
// Wrapper around WakuFilter.Subscribe // Wrapper around WakuFilter.Subscribe
// that adds a Filter object to node.filters // that adds a Filter object to node.filters
func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) { func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) {
@ -447,7 +357,7 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilte
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received. // Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs // ContentFilterChan takes MessagePush structs
subs, err := node.filter.Subscribe(ctx, f) subs, err := node.filter.Subscribe(ctx, f)
if subs.RequestID == "" || err != nil { if err != nil || subs.RequestID == "" {
// Failed to subscribe // Failed to subscribe
log.Error("remote subscription to filter failed", err) log.Error("remote subscription to filter failed", err)
return return
@ -540,22 +450,6 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFi
return nil return nil
} }
func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set")
}
hash, err := node.relay.Publish(ctx, message, topic)
if err != nil {
return nil, err
}
return hash, nil
}
func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error { func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error {
info, err := peer.AddrInfoFromP2pAddr(address) info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil { if err != nil {

View File

@ -9,25 +9,25 @@ import (
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/tests"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *pubsub.Subscription, host.Host) { func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5) port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err) require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host) relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10))
require.NoError(t, err) require.NoError(t, err)
sub, _, err := relay.Subscribe(topic) sub, err := relay.Subscribe(context.Background(), &topic)
require.NoError(t, err) require.NoError(t, err)
return relay, sub, host return relay, sub, host
@ -48,11 +48,11 @@ func TestWakuLightPush(t *testing.T) {
var testTopic relay.Topic = "/waku/2/go/lightpush/test" var testTopic relay.Topic = "/waku/2/go/lightpush/test"
node1, sub1, host1 := makeWakuRelay(t, testTopic) node1, sub1, host1 := makeWakuRelay(t, testTopic)
defer node1.Stop() defer node1.Stop()
defer sub1.Cancel() defer sub1.Unsubscribe()
node2, sub2, host2 := makeWakuRelay(t, testTopic) node2, sub2, host2 := makeWakuRelay(t, testTopic)
defer node2.Stop() defer node2.Stop()
defer sub2.Cancel() defer sub2.Unsubscribe()
ctx := context.Background() ctx := context.Background()
lightPushNode2 := NewWakuLightPush(ctx, host2, node2) lightPushNode2 := NewWakuLightPush(ctx, host2, node2)
@ -92,21 +92,15 @@ func TestWakuLightPush(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
_, err := sub1.Next(context.Background()) <-sub1.C
require.NoError(t, err) <-sub1.C
_, err = sub1.Next(context.Background())
require.NoError(t, err)
}() }()
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
_, err := sub2.Next(context.Background()) <-sub2.C
require.NoError(t, err) <-sub2.C
_, err = sub2.Next(context.Background())
require.NoError(t, err)
}() }()
// Verifying succesful request // Verifying succesful request
@ -127,10 +121,9 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil) client := NewWakuLightPush(ctx, clientHost, nil)
err = client.Start() err = client.Start()
require.Errorf(t, err, "relay is required") require.Errorf(t, err, "relay is required")
} }

View File

@ -1,4 +1,4 @@
package node package relay
import ( import (
"sync" "sync"
@ -12,23 +12,21 @@ type Subscription struct {
C chan *protocol.Envelope C chan *protocol.Envelope
closed bool closed bool
mutex sync.Mutex once sync.Once
quit chan struct{} quit chan struct{}
} }
// Unsubscribe will close a subscription from a pubsub topic. Will close the message channel // Unsubscribe will close a subscription from a pubsub topic. Will close the message channel
func (subs *Subscription) Unsubscribe() { func (subs *Subscription) Unsubscribe() {
subs.mutex.Lock() subs.once.Do(func() {
defer subs.mutex.Unlock()
if !subs.closed {
close(subs.quit)
subs.closed = true subs.closed = true
} close(subs.quit)
close(subs.C)
})
} }
// IsClosed determine whether a Subscription is still open for receiving messages // IsClosed determine whether a Subscription is still open for receiving messages
func (subs *Subscription) IsClosed() bool { func (subs *Subscription) IsClosed() bool {
subs.mutex.Lock()
defer subs.mutex.Unlock()
return subs.closed return subs.closed
} }

View File

@ -4,17 +4,22 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"errors" "errors"
"fmt"
"sync" "sync"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
"go.opencensus.io/stats"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "go.opencensus.io/tag"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/metrics"
waku_proto "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
) )
var log = logging.Logger("wakurelay") var log = logging.Logger("wakurelay")
@ -32,6 +37,11 @@ type WakuRelay struct {
topicsMutex sync.Mutex topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*pubsub.Topic wakuRelayTopics map[Topic]*pubsub.Topic
relaySubs map[Topic]*pubsub.Subscription relaySubs map[Topic]*pubsub.Subscription
bcaster v2.Broadcaster
subscriptions map[Topic][]*Subscription
subscriptionsMutex sync.Mutex
} }
// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn // Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn
@ -40,12 +50,14 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
return string(hash[:]) return string(hash[:])
} }
func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*WakuRelay, error) { func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay) w := new(WakuRelay)
w.host = h w.host = h
w.topics = make(map[Topic]bool) w.topics = make(map[Topic]bool)
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic) w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
w.relaySubs = make(map[Topic]*pubsub.Subscription) w.relaySubs = make(map[Topic]*pubsub.Subscription)
w.subscriptions = make(map[Topic][]*Subscription)
w.bcaster = bcaster
// default options required by WakuRelay // default options required by WakuRelay
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
@ -113,25 +125,24 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
return pubSubTopic, nil return pubSubTopic, nil
} }
func (w *WakuRelay) Subscribe(topic Topic) (subs *pubsub.Subscription, isNew bool, err error) { func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error) {
sub, ok := w.relaySubs[topic] sub, ok := w.relaySubs[topic]
if !ok { if !ok {
pubSubTopic, err := w.upsertTopic(topic) pubSubTopic, err := w.upsertTopic(topic)
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
sub, err = pubSubTopic.Subscribe() sub, err = pubSubTopic.Subscribe()
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
w.relaySubs[topic] = sub w.relaySubs[topic] = sub
log.Info("Subscribing to topic ", topic) log.Info("Subscribing to topic ", topic)
} }
isNew = !ok // ok will be true if subscription already exists return sub, nil
return sub, isNew, nil
} }
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) { func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) {
@ -175,4 +186,96 @@ func GetTopic(topic *Topic) Topic {
func (w *WakuRelay) Stop() { func (w *WakuRelay) Stop() {
w.host.RemoveStreamHandler(WakuRelayID_v200) w.host.RemoveStreamHandler(WakuRelayID_v200)
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
for _, topic := range w.Topics() {
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
}
w.subscriptions = nil
}
func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
t := GetTopic(topic)
sub, err := w.subscribe(t)
if err != nil {
return nil, err
}
// Create client subscription
subscription := new(Subscription)
subscription.closed = false
subscription.C = make(chan *waku_proto.Envelope, 1024) // To avoid blocking
subscription.quit = make(chan struct{})
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
w.subscriptions[t] = append(w.subscriptions[t], subscription)
if w.bcaster != nil {
w.bcaster.Register(subscription.C)
}
go w.subscribeToTopic(t, subscription, sub)
return subscription, nil
}
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024)
go func(msgChannel chan *pubsub.Message) {
for {
msg, err := sub.Next(ctx)
if err != nil {
log.Error(fmt.Errorf("subscription failed: %w", err))
sub.Cancel()
close(msgChannel)
for _, subscription := range w.subscriptions[Topic(sub.Topic())] {
subscription.Unsubscribe()
}
}
msgChannel <- msg
}
}(msgChannel)
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *pubsub.Subscription) {
ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay"))
if err != nil {
log.Error(err)
return
}
subChannel := w.nextMessage(ctx, sub)
for {
select {
case <-subscription.quit:
if w.bcaster != nil {
w.bcaster.Unregister(subscription.C) // Remove from broadcast list
}
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg := <-subChannel:
stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
log.Error("could not decode message", err)
return
}
envelope := waku_proto.NewEnvelope(wakuMessage, string(t))
if w.bcaster != nil {
w.bcaster.Submit(envelope)
}
}
}
} }

View File

@ -19,18 +19,13 @@ func TestWakuRelay(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
relay, err := NewWakuRelay(context.Background(), host) relay, err := NewWakuRelay(context.Background(), host, nil)
defer relay.Stop() defer relay.Stop()
require.NoError(t, err) require.NoError(t, err)
sub, isNew, err := relay.Subscribe(testTopic) sub, err := relay.subscribe(testTopic)
defer sub.Cancel() defer sub.Cancel()
require.NoError(t, err) require.NoError(t, err)
require.True(t, isNew)
_, isNew, err = relay.Subscribe(testTopic)
require.NoError(t, err)
require.False(t, isNew)
topics := relay.Topics() topics := relay.Topics()
require.Equal(t, 1, len(topics)) require.Equal(t, 1, len(topics))

View File

@ -707,6 +707,12 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
func (w *WakuStore) Stop() { func (w *WakuStore) Stop() {
w.started = false w.started = false
if w.MsgC != nil {
close(w.MsgC) close(w.MsgC)
}
if w.h != nil {
w.h.RemoveStreamHandler(StoreID_v20beta3) w.h.RemoveStreamHandler(StoreID_v20beta3)
}
} }