fix: waku2 - send messages in a separate goroutine (#2433)

This commit is contained in:
Richard Ramos 2021-11-18 14:31:28 -04:00 committed by GitHub
parent b4c4cf6241
commit 6099380d9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 32 deletions

View File

@ -261,14 +261,6 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
PersistPeers: nodeConfig.WakuV2Config.PersistPeers,
}
if cfg.Host == "" {
cfg.Host = wakuv2.DefaultConfig.Host
}
if cfg.DiscoveryLimit == 0 {
cfg.DiscoveryLimit = wakuv2.DefaultConfig.DiscoveryLimit
}
if nodeConfig.WakuV2Config.MaxMessageSize > 0 {
cfg.MaxMessageSize = nodeConfig.WakuV2Config.MaxMessageSize
}

View File

@ -113,8 +113,9 @@ type Waku struct {
bandwidthCounter *metrics.BandwidthCounter
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
quit chan struct{} // Channel used for graceful exit
sendQueue chan *pb.WakuMessage
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
quit chan struct{} // Channel used for graceful exit
settings settings // Holds configuration settings that can be dynamically changed
settingsMu sync.RWMutex // Mutex to sync the settings access
@ -126,17 +127,47 @@ type Waku struct {
logger *zap.Logger
}
func setDefaults(cfg *Config) *Config {
if cfg == nil {
cfg = new(Config)
}
if cfg.MaxMessageSize == 0 {
cfg.MaxMessageSize = DefaultConfig.MaxMessageSize
}
if cfg.Host == "" {
cfg.Host = DefaultConfig.Host
}
if cfg.Port == 0 {
cfg.Port = DefaultConfig.Port
}
if cfg.KeepAliveInterval == 0 {
cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval
}
if cfg.DiscoveryLimit == 0 {
cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit
}
if cfg.MinPeersForRelay == 0 {
cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay
}
return cfg
}
// New creates a WakuV2 client ready to communicate through the LibP2P network.
func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, error) {
if logger == nil {
logger = zap.NewNop()
}
cfg = setDefaults(cfg)
logger.Debug("starting wakuv2 with config", zap.Any("config", cfg))
if cfg == nil {
c := DefaultConfig
cfg = &c
}
waku := &Waku{
privateKeys: make(map[string]*ecdsa.PrivateKey),
@ -144,6 +175,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage),
expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *pb.WakuMessage, 1000),
quit: make(chan struct{}),
dnsAddressCache: make(map[string][]multiaddr.Multiaddr),
dnsAddressCacheLock: &sync.RWMutex{},
@ -768,40 +800,68 @@ func (w *Waku) notEnoughPeers() bool {
return numPeers <= w.settings.MinPeersForRelay
}
func (w *Waku) broadcast() {
for {
select {
case msg := <-w.sendQueue:
hash, err := msg.Hash()
if err != nil {
log.Error("invalid message")
continue
}
if w.settings.LightClient || w.notEnoughPeers() {
log.Debug("publishing message via lightpush", zap.Any("hash", hexutil.Encode(hash)))
_, err = w.node.Lightpush().Publish(context.Background(), msg, nil)
} else {
log.Debug("publishing message via relay", zap.Any("hash", hexutil.Encode(hash)))
_, err = w.node.Relay().Publish(context.Background(), msg, nil)
}
if err != nil {
log.Error("could not send message", zap.Any("hash", hexutil.Encode(hash)), zap.Error(err))
w.envelopeFeed.Send(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(hash),
Event: common.EventEnvelopeExpired,
})
continue
}
event := common.EnvelopeEvent{
Event: common.EventEnvelopeSent,
Hash: gethcommon.BytesToHash(hash),
}
w.SendEnvelopeEvent(event)
case <-w.quit:
return
}
}
}
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
var err error
var hash []byte
if w.settings.LightClient || w.notEnoughPeers() {
log.Debug("publishing message via lightpush")
hash, err = w.node.Lightpush().Publish(context.Background(), msg, nil)
} else {
log.Debug("publishing message via relay")
hash, err = w.node.Relay().Publish(context.Background(), msg, nil)
}
hash, err := msg.Hash()
if err != nil {
return nil, err
}
w.sendQueue <- msg
w.poolMu.Lock()
_, alreadyCached := w.envelopes[gethcommon.BytesToHash(hash)]
w.poolMu.Unlock()
if !alreadyCached {
envelope := wakuprotocol.NewEnvelope(msg, string(relay.GetTopic(nil)))
recvMessage := common.NewReceivedMessage(envelope)
w.postEvent(recvMessage) // notify the local node about the new message
w.addEnvelope(recvMessage)
}
event := common.EnvelopeEvent{
Event: common.EventEnvelopeSent,
Hash: gethcommon.BytesToHash(hash),
}
w.SendEnvelopeEvent(event)
return hash, nil
}
@ -850,6 +910,8 @@ func (w *Waku) Start() error {
go w.processQueue()
}
go w.broadcast()
return nil
}