refactor(waku2): publish messages on a goroutine (#4466)

This commit is contained in:
richΛrd 2023-12-15 08:02:04 -04:00 committed by GitHub
parent 90c31afe7c
commit 93aeefcb89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 26 deletions

View File

@ -1026,43 +1026,49 @@ func (w *Waku) broadcast() {
for {
select {
case envelope := <-w.sendQueue:
pubsubTopic := envelope.PubsubTopic()
var err error
logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
// For now only used in testing to simulate going offline
logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
var fn publishFn
if w.settings.SkipPublishToTopic {
err = errors.New("Test send failure")
// For now only used in testing to simulate going offline
fn = func(env *protocol.Envelope, logger *zap.Logger) error { return errors.New("test send failure") }
} else if w.settings.LightClient {
w.logger.Info("publishing message via lightpush")
_, err = w.node.Lightpush().Publish(w.ctx, envelope.Message(), lightpush.WithPubSubTopic(pubsubTopic))
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()))
return err
}
} else {
logger.Info("publishing message via relay")
_, err = w.node.Relay().Publish(w.ctx, envelope.Message(), relay.WithPubSubTopic(pubsubTopic))
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via relay")
_, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic()))
return err
}
}
if err != nil {
logger.Error("could not send message", zap.Error(err))
w.envelopeFeed.Send(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash()),
Event: common.EventEnvelopeExpired,
})
continue
}
event := common.EnvelopeEvent{
Event: common.EventEnvelopeSent,
Hash: gethcommon.BytesToHash(envelope.Hash()),
}
w.SendEnvelopeEvent(event)
go w.publishEnvelope(envelope, fn, logger)
case <-w.ctx.Done():
return
}
}
}
type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error
func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) {
var event common.EventType
if err := publishFn(envelope, logger); err != nil {
logger.Error("could not send message", zap.Error(err))
event = common.EventEnvelopeExpired
} else {
event = common.EventEnvelopeSent
}
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash()),
Event: event,
})
}
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {