feat(waku2): use relay or lightpush depending on the number of peers (#2425)

This commit is contained in:
Richard Ramos 2021-11-09 15:16:05 -04:00 committed by GitHub
parent f47229a466
commit d35e0a339d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 54 additions and 10 deletions

2
go.mod
View File

@ -49,7 +49,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432
github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6 github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6
github.com/status-im/migrate/v4 v4.6.2-status.2 github.com/status-im/migrate/v4 v4.6.2-status.2

4
go.sum
View File

@ -1207,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS
github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE= github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU= github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU=
github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d h1:mWatRmDv+xopBdnd4SYj6I4mrqS0fTgMy2Q0r79LD0w= github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc h1:OBoMUanISPnSAoMg0GIGGz6raeohIbHyhCjFbfSuea4=
github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -128,7 +128,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
} }
func (w *WakuNode) Start() error { func (w *WakuNode) Start() error {
w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
if w.opts.enableStore { if w.opts.enableStore {
w.startStore() w.startStore()
} }
@ -265,7 +265,7 @@ func (w *WakuNode) mountRendezvous() error {
} }
func (w *WakuNode) startStore() { func (w *WakuNode) startStore() {
w.store.Start(w.ctx, w.host) w.store.Start(w.ctx)
if w.opts.shouldResume { if w.opts.shouldResume {
// TODO: extract this to a function and run it when you go offline // TODO: extract this to a function and run it when you go offline

View File

@ -229,9 +229,38 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription,
return subscription, nil return subscription, nil
} }
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error {
if _, ok := w.topics[topic]; !ok {
return fmt.Errorf("topics %s is not subscribed", (string)(topic))
}
log.Info("Unsubscribing from topic ", topic)
delete(w.topics, topic)
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
w.relaySubs[topic].Cancel()
delete(w.relaySubs, topic)
err := w.wakuRelayTopics[topic].Close()
if err != nil {
return err
}
delete(w.wakuRelayTopics, topic)
return nil
}
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message { func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024) msgChannel := make(chan *pubsub.Message, 1024)
go func(msgChannel chan *pubsub.Message) { go func(msgChannel chan *pubsub.Message) {
defer func() {
if r := recover(); r != nil {
log.Debug("recovered msgChannel")
}
}()
for { for {
msg, err := sub.Next(ctx) msg, err := sub.Next(ctx)
if err != nil { if err != nil {
@ -266,6 +295,9 @@ func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *p
} }
// TODO: if there are no more relay subscriptions, close the pubsub subscription // TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg := <-subChannel: case msg := <-subChannel:
if msg == nil {
return
}
stats.Record(ctx, metrics.Messages.M(1)) stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{} wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {

View File

@ -236,9 +236,10 @@ type WakuStore struct {
} }
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
wakuStore := new(WakuStore) wakuStore := new(WakuStore)
wakuStore.msgProvider = p wakuStore.msgProvider = p
wakuStore.h = host
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
return wakuStore return wakuStore
} }
@ -249,13 +250,12 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) {
} }
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider // Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context, h host.Host) { func (store *WakuStore) Start(ctx context.Context) {
if store.started { if store.started {
return return
} }
store.started = true store.started = true
store.h = h
store.ctx = ctx store.ctx = ctx
store.MsgC = make(chan *protocol.Envelope, 1024) store.MsgC = make(chan *protocol.Envelope, 1024)

2
vendor/modules.txt vendored
View File

@ -447,7 +447,7 @@ github.com/spacemonkeygo/spacelog
github.com/status-im/doubleratchet github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.1 # github.com/status-im/go-multiaddr-ethv4 v1.2.1
github.com/status-im/go-multiaddr-ethv4 github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d # github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc
github.com/status-im/go-waku/waku/persistence github.com/status-im/go-waku/waku/persistence
github.com/status-im/go-waku/waku/v2 github.com/status-im/go-waku/waku/v2
github.com/status-im/go-waku/waku/v2/discovery github.com/status-im/go-waku/waku/v2/discovery

View File

@ -31,6 +31,7 @@ type Config struct {
PersistPeers bool `toml:",omitempty"` PersistPeers bool `toml:",omitempty"`
PeerExchange bool `toml:",omitempty"` PeerExchange bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"` KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"`
LightClient bool `toml:",omitempty"` LightClient bool `toml:",omitempty"`
RelayNodes []string `toml:",omitempty"` RelayNodes []string `toml:",omitempty"`
StoreNodes []string `toml:",omitempty"` StoreNodes []string `toml:",omitempty"`
@ -47,4 +48,5 @@ var DefaultConfig = Config{
Port: 60000, Port: 60000,
KeepAliveInterval: 10, // second KeepAliveInterval: 10, // second
DiscoveryLimit: 40, DiscoveryLimit: 40,
MinPeersForRelay: 2, // TODO: determine correct value with Vac team
} }

View File

@ -80,6 +80,7 @@ const requestTimeout = 5 * time.Second
type settings struct { type settings struct {
LightClient bool // Indicates if the node is a light client LightClient bool // Indicates if the node is a light client
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol instead of Lightpush
MaxMsgSize uint32 // Maximal message length allowed by the waku node MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations EnableConfirmations bool // Enable sending message confirmations
SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from
@ -154,6 +155,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
MaxMsgSize: cfg.MaxMessageSize, MaxMsgSize: cfg.MaxMessageSize,
SoftBlacklistedPeerIDs: make(map[string]bool), SoftBlacklistedPeerIDs: make(map[string]bool),
LightClient: cfg.LightClient, LightClient: cfg.LightClient,
MinPeersForRelay: cfg.MinPeersForRelay,
} }
waku.filters = common.NewFilters() waku.filters = common.NewFilters()
@ -760,15 +762,23 @@ func (w *Waku) UnsubscribeMany(ids []string) error {
return nil return nil
} }
func (w *Waku) notEnoughPeers() bool {
topic := string(relay.GetTopic(nil))
numPeers := len(w.node.Relay().PubSub().ListPeers(topic))
return numPeers <= w.settings.MinPeersForRelay
}
// Send injects a message into the waku send queue, to be distributed in the // Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles. // network in the coming cycles.
func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) { func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
var err error var err error
var hash []byte var hash []byte
if w.settings.LightClient { if w.settings.LightClient || w.notEnoughPeers() {
log.Debug("publishing message via lightpush")
hash, err = w.node.Lightpush().Publish(context.Background(), msg, nil) hash, err = w.node.Lightpush().Publish(context.Background(), msg, nil)
} else { } else {
log.Debug("publishing message via relay")
hash, err = w.node.Relay().Publish(context.Background(), msg, nil) hash, err = w.node.Relay().Publish(context.Background(), msg, nil)
} }