diff --git a/node/status_node_services.go b/node/status_node_services.go index 7540f9f4f..199868505 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -261,14 +261,6 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku, PersistPeers: nodeConfig.WakuV2Config.PersistPeers, } - if cfg.Host == "" { - cfg.Host = wakuv2.DefaultConfig.Host - } - - if cfg.DiscoveryLimit == 0 { - cfg.DiscoveryLimit = wakuv2.DefaultConfig.DiscoveryLimit - } - if nodeConfig.WakuV2Config.MaxMessageSize > 0 { cfg.MaxMessageSize = nodeConfig.WakuV2Config.MaxMessageSize } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 6cc76d2b9..f2aaca1ff 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -113,8 +113,9 @@ type Waku struct { bandwidthCounter *metrics.BandwidthCounter - msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded - quit chan struct{} // Channel used for graceful exit + sendQueue chan *pb.WakuMessage + msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded + quit chan struct{} // Channel used for graceful exit settings settings // Holds configuration settings that can be dynamically changed settingsMu sync.RWMutex // Mutex to sync the settings access @@ -126,17 +127,47 @@ type Waku struct { logger *zap.Logger } +func setDefaults(cfg *Config) *Config { + if cfg == nil { + cfg = new(Config) + } + + if cfg.MaxMessageSize == 0 { + cfg.MaxMessageSize = DefaultConfig.MaxMessageSize + } + + if cfg.Host == "" { + cfg.Host = DefaultConfig.Host + } + + if cfg.Port == 0 { + cfg.Port = DefaultConfig.Port + } + + if cfg.KeepAliveInterval == 0 { + cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval + } + + if cfg.DiscoveryLimit == 0 { + cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit + } + + if cfg.MinPeersForRelay == 0 { + cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay + } + + return cfg +} + // New creates a WakuV2 client ready to communicate through the LibP2P network. func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, error) { if logger == nil { logger = zap.NewNop() } + cfg = setDefaults(cfg) + logger.Debug("starting wakuv2 with config", zap.Any("config", cfg)) - if cfg == nil { - c := DefaultConfig - cfg = &c - } waku := &Waku{ privateKeys: make(map[string]*ecdsa.PrivateKey), @@ -144,6 +175,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), expirations: make(map[uint32]mapset.Set), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), + sendQueue: make(chan *pb.WakuMessage, 1000), quit: make(chan struct{}), dnsAddressCache: make(map[string][]multiaddr.Multiaddr), dnsAddressCacheLock: &sync.RWMutex{}, @@ -768,40 +800,68 @@ func (w *Waku) notEnoughPeers() bool { return numPeers <= w.settings.MinPeersForRelay } +func (w *Waku) broadcast() { + for { + select { + case msg := <-w.sendQueue: + + hash, err := msg.Hash() + if err != nil { + log.Error("invalid message") + continue + } + + if w.settings.LightClient || w.notEnoughPeers() { + log.Debug("publishing message via lightpush", zap.Any("hash", hexutil.Encode(hash))) + _, err = w.node.Lightpush().Publish(context.Background(), msg, nil) + } else { + log.Debug("publishing message via relay", zap.Any("hash", hexutil.Encode(hash))) + _, err = w.node.Relay().Publish(context.Background(), msg, nil) + } + + if err != nil { + log.Error("could not send message", zap.Any("hash", hexutil.Encode(hash)), zap.Error(err)) + w.envelopeFeed.Send(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(hash), + Event: common.EventEnvelopeExpired, + }) + + continue + } + + event := common.EnvelopeEvent{ + Event: common.EventEnvelopeSent, + Hash: gethcommon.BytesToHash(hash), + } + + w.SendEnvelopeEvent(event) + + case <-w.quit: + return + } + } +} + // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) { - var err error - var hash []byte - - if w.settings.LightClient || w.notEnoughPeers() { - log.Debug("publishing message via lightpush") - hash, err = w.node.Lightpush().Publish(context.Background(), msg, nil) - } else { - log.Debug("publishing message via relay") - hash, err = w.node.Relay().Publish(context.Background(), msg, nil) - } - + hash, err := msg.Hash() if err != nil { return nil, err } + w.sendQueue <- msg + w.poolMu.Lock() _, alreadyCached := w.envelopes[gethcommon.BytesToHash(hash)] w.poolMu.Unlock() if !alreadyCached { envelope := wakuprotocol.NewEnvelope(msg, string(relay.GetTopic(nil))) recvMessage := common.NewReceivedMessage(envelope) + w.postEvent(recvMessage) // notify the local node about the new message w.addEnvelope(recvMessage) } - event := common.EnvelopeEvent{ - Event: common.EventEnvelopeSent, - Hash: gethcommon.BytesToHash(hash), - } - - w.SendEnvelopeEvent(event) - return hash, nil } @@ -850,6 +910,8 @@ func (w *Waku) Start() error { go w.processQueue() } + go w.broadcast() + return nil }