fix: code review

This commit is contained in:
Richard Ramos 2024-10-15 14:46:34 -04:00
parent 0c12367392
commit 01880d33e4
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
7 changed files with 44 additions and 24 deletions

View File

@ -8,7 +8,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
)
type Result interface {
type StoreRequestResult interface {
Cursor() []byte
IsComplete() bool
PeerID() peer.ID

View File

@ -20,14 +20,14 @@ type defaultStorenodeRequestor struct {
store *store.WakuStore
}
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.Result, error) {
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
}
func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.Result, error) {
func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) {
return d.store.Query(ctx, store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
TimeStart: from,
TimeEnd: to,
}, store.WithPeer(peerID), store.WithPaging(false, 100), store.IncludeData(false))
}, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false))
}

View File

@ -22,6 +22,7 @@ import (
const maxContentTopicsPerRequest = 10
const maxMsgHashesPerRequest = 50
const messageFetchPageSize = 100
// MessageTracker should keep track of messages it has seen before and
// provide a way to determine whether a message exists or not. This
@ -31,8 +32,8 @@ type MessageTracker interface {
}
type StorenodeRequestor interface {
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.Result, error)
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.Result, error)
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error)
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error)
}
// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
@ -183,7 +184,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
}
}
func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (common.Result, error), logger *zap.Logger, logMsg string) (common.Result, error) {
func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (common.StoreRequestResult, error), logger *zap.Logger, logMsg string) (common.StoreRequestResult, error) {
retry := true
count := 1
for retry && count <= m.params.maxAttemptsToRetrieveHistory {
@ -217,11 +218,11 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
logging.Epoch("to", now),
)
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
return m.storenodeRequestor.QueryWithCriteria(
ctx,
interest.peerID,
100,
messageFetchPageSize,
interest.contentFilter.PubsubTopic,
contentTopics[batchFrom:batchTo],
proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
@ -252,7 +253,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
missingHashes = append(missingHashes, hash)
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
@ -291,7 +292,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
defer utils.LogOnPanic()
defer wg.Wait()
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
defer cancel()
return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes)
@ -312,7 +313,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
}
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}

View File

@ -2,6 +2,7 @@ package publish
import (
"context"
"errors"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
@ -9,26 +10,41 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
var ErrRelayNotAvailable = errors.New("relay is not available")
var ErrLightpushNotAvailable = errors.New("lightpush is not available")
func NewDefaultPublisher(lightpush *lightpush.WakuLightPush, relay *relay.WakuRelay) Publisher {
return &defaultPublisher{
lightPush: lightpush,
lightpush: lightpush,
relay: relay,
}
}
type defaultPublisher struct {
lightPush *lightpush.WakuLightPush
lightpush *lightpush.WakuLightPush
relay *relay.WakuRelay
}
func (d *defaultPublisher) RelayListPeers(pubsubTopic string) []peer.ID {
return d.relay.PubSub().ListPeers(pubsubTopic)
func (d *defaultPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) {
if d.relay == nil {
return nil, ErrRelayNotAvailable
}
return d.relay.PubSub().ListPeers(pubsubTopic), nil
}
func (d *defaultPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
if d.relay == nil {
return pb.MessageHash{}, ErrRelayNotAvailable
}
return d.relay.Publish(ctx, message, relay.WithPubSubTopic(pubsubTopic))
}
func (d *defaultPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) {
return d.lightPush.Publish(ctx, message, lightpush.WithPubSubTopic(pubsubTopic), lightpush.WithMaxPeers(maxPeers))
if d.lightpush == nil {
return pb.MessageHash{}, ErrLightpushNotAvailable
}
return d.lightpush.Publish(ctx, message, lightpush.WithPubSubTopic(pubsubTopic), lightpush.WithMaxPeers(maxPeers))
}

View File

@ -18,7 +18,7 @@ type defaultStorenodeMessageVerifier struct {
store *store.WakuStore
}
func (d *defaultStorenodeMessageVerifier) MessagesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
var opts []store.RequestOption
opts = append(opts, store.WithRequestID(requestID))
opts = append(opts, store.WithPeer(peerID))

View File

@ -33,7 +33,7 @@ type ISentCheck interface {
type StorenodeMessageVerifier interface {
// MessagesExist returns a list of the messages it found from a list of message hashes
MessagesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
}
// MessageSentCheck tracks the outgoing messages and check against store node
@ -228,7 +228,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
defer cancel()
result, err := m.messageVerifier.MessagesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
if err != nil {
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
return []common.Hash{}

View File

@ -38,7 +38,7 @@ func (pm PublishMethod) String() string {
type Publisher interface {
// RelayListPeers returns the list of peers for a pubsub topic
RelayListPeers(pubsubTopic string) []peer.ID
RelayListPeers(pubsubTopic string) ([]peer.ID, error)
// RelayPublish publishes a message via WakuRelay
RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error)
@ -128,9 +128,12 @@ func (ms *MessageSender) Send(req *Request) error {
return err
}
case Relay:
peerCnt := len(ms.publisher.RelayListPeers(req.envelope.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
_, err := ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic())
peers, err := ms.publisher.RelayListPeers(req.envelope.PubsubTopic())
if err != nil {
return err
}
logger.Info("publishing message via relay", zap.Int("peerCnt", len(peers)))
_, err = ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic())
if err != nil {
return err
}