diff --git a/go.mod b/go.mod index 63ce6da83..57f5baa80 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d + github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6 github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index 269563de3..d85568412 100644 --- a/go.sum +++ b/go.sum @@ -1207,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE= github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU= -github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d h1:mWatRmDv+xopBdnd4SYj6I4mrqS0fTgMy2Q0r79LD0w= -github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= +github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc h1:OBoMUanISPnSAoMg0GIGGz6raeohIbHyhCjFbfSuea4= +github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index e09f85007..8d64967cb 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -128,7 +128,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) Start() error { - w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) + w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) if w.opts.enableStore { w.startStore() } @@ -265,7 +265,7 @@ func (w *WakuNode) mountRendezvous() error { } func (w *WakuNode) startStore() { - w.store.Start(w.ctx, w.host) + w.store.Start(w.ctx) if w.opts.shouldResume { // TODO: extract this to a function and run it when you go offline diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go index 8189d4ead..8e4c79329 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -229,9 +229,38 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, return subscription, nil } +func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error { + if _, ok := w.topics[topic]; !ok { + return fmt.Errorf("topics %s is not subscribed", (string)(topic)) + } + log.Info("Unsubscribing from topic ", topic) + delete(w.topics, topic) + + for _, sub := range w.subscriptions[topic] { + sub.Unsubscribe() + } + + w.relaySubs[topic].Cancel() + delete(w.relaySubs, topic) + + err := w.wakuRelayTopics[topic].Close() + if err != nil { + return err + } + delete(w.wakuRelayTopics, topic) + + return nil +} + func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message { msgChannel := make(chan *pubsub.Message, 1024) go func(msgChannel chan *pubsub.Message) { + defer func() { + if r := recover(); r != nil { + log.Debug("recovered msgChannel") + } + }() + for { msg, err := sub.Next(ctx) if err != nil { @@ -266,6 +295,9 @@ func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *p } // TODO: if there are no more relay subscriptions, close the pubsub subscription case msg := <-subChannel: + if msg == nil { + return + } stats.Record(ctx, metrics.Messages.M(1)) wakuMessage := &pb.WakuMessage{} if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go index e1f64ca3a..5bccc0c99 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go @@ -236,9 +236,10 @@ type WakuStore struct { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { +func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p + wakuStore.h = host wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) return wakuStore } @@ -249,13 +250,12 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) { } // Start initializes the WakuStore by enabling the protocol and fetching records from a message provider -func (store *WakuStore) Start(ctx context.Context, h host.Host) { +func (store *WakuStore) Start(ctx context.Context) { if store.started { return } store.started = true - store.h = h store.ctx = ctx store.MsgC = make(chan *protocol.Envelope, 1024) diff --git a/vendor/modules.txt b/vendor/modules.txt index 85be03ae5..8a28f991f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -447,7 +447,7 @@ github.com/spacemonkeygo/spacelog github.com/status-im/doubleratchet # github.com/status-im/go-multiaddr-ethv4 v1.2.1 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d +# github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc github.com/status-im/go-waku/waku/persistence github.com/status-im/go-waku/waku/v2 github.com/status-im/go-waku/waku/v2/discovery diff --git a/wakuv2/config.go b/wakuv2/config.go index 6381da5bb..c7b0deb06 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -31,6 +31,7 @@ type Config struct { PersistPeers bool `toml:",omitempty"` PeerExchange bool `toml:",omitempty"` KeepAliveInterval int `toml:",omitempty"` + MinPeersForRelay int `toml:",omitempty"` LightClient bool `toml:",omitempty"` RelayNodes []string `toml:",omitempty"` StoreNodes []string `toml:",omitempty"` @@ -47,4 +48,5 @@ var DefaultConfig = Config{ Port: 60000, KeepAliveInterval: 10, // second DiscoveryLimit: 40, + MinPeersForRelay: 2, // TODO: determine correct value with Vac team } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 71f1f1d70..f48fa2f6e 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -80,6 +80,7 @@ const requestTimeout = 5 * time.Second type settings struct { LightClient bool // Indicates if the node is a light client + MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol instead of Lightpush MaxMsgSize uint32 // Maximal message length allowed by the waku node EnableConfirmations bool // Enable sending message confirmations SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from @@ -154,6 +155,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, MaxMsgSize: cfg.MaxMessageSize, SoftBlacklistedPeerIDs: make(map[string]bool), LightClient: cfg.LightClient, + MinPeersForRelay: cfg.MinPeersForRelay, } waku.filters = common.NewFilters() @@ -760,15 +762,23 @@ func (w *Waku) UnsubscribeMany(ids []string) error { return nil } +func (w *Waku) notEnoughPeers() bool { + topic := string(relay.GetTopic(nil)) + numPeers := len(w.node.Relay().PubSub().ListPeers(topic)) + return numPeers <= w.settings.MinPeersForRelay +} + // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) { var err error var hash []byte - if w.settings.LightClient { + if w.settings.LightClient || w.notEnoughPeers() { + log.Debug("publishing message via lightpush") hash, err = w.node.Lightpush().Publish(context.Background(), msg, nil) } else { + log.Debug("publishing message via relay") hash, err = w.node.Relay().Publish(context.Background(), msg, nil) }