diff --git a/waku/v2/api/common/result.go b/waku/v2/api/common/result.go index 17dac099..d8ce175a 100644 --- a/waku/v2/api/common/result.go +++ b/waku/v2/api/common/result.go @@ -8,7 +8,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) -type Result interface { +type StoreRequestResult interface { Cursor() []byte IsComplete() bool PeerID() peer.ID diff --git a/waku/v2/api/missing/default_requestor.go b/waku/v2/api/missing/default_requestor.go index ffd146ca..248d61c6 100644 --- a/waku/v2/api/missing/default_requestor.go +++ b/waku/v2/api/missing/default_requestor.go @@ -20,14 +20,14 @@ type defaultStorenodeRequestor struct { store *store.WakuStore } -func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.Result, error) { +func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) { return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize)) } -func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.Result, error) { +func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) { return d.store.Query(ctx, store.FilterCriteria{ ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), TimeStart: from, TimeEnd: to, - }, store.WithPeer(peerID), store.WithPaging(false, 100), store.IncludeData(false)) + }, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false)) } diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index aa1ee4ff..1af991eb 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -22,6 +22,7 @@ import ( const maxContentTopicsPerRequest = 10 const maxMsgHashesPerRequest = 50 +const messageFetchPageSize = 100 // MessageTracker should keep track of messages it has seen before and // provide a way to determine whether a message exists or not. This @@ -31,8 +32,8 @@ type MessageTracker interface { } type StorenodeRequestor interface { - GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.Result, error) - QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.Result, error) + GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) + QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) } // MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria @@ -183,7 +184,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter } } -func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (common.Result, error), logger *zap.Logger, logMsg string) (common.Result, error) { +func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (common.StoreRequestResult, error), logger *zap.Logger, logMsg string) (common.StoreRequestResult, error) { retry := true count := 1 for retry && count <= m.params.maxAttemptsToRetrieveHistory { @@ -217,11 +218,11 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, logging.Epoch("to", now), ) - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) { + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { return m.storenodeRequestor.QueryWithCriteria( ctx, interest.peerID, - 100, + messageFetchPageSize, interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo], proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), @@ -252,7 +253,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, missingHashes = append(missingHashes, hash) } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { if err = result.Next(ctx); err != nil { return nil, err } @@ -291,7 +292,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, defer utils.LogOnPanic() defer wg.Wait() - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) { + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) defer cancel() return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes) @@ -312,7 +313,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, } } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { if err = result.Next(ctx); err != nil { return nil, err } diff --git a/waku/v2/api/publish/default_publisher.go b/waku/v2/api/publish/default_publisher.go index 70f350ad..4ca940ce 100644 --- a/waku/v2/api/publish/default_publisher.go +++ b/waku/v2/api/publish/default_publisher.go @@ -2,6 +2,7 @@ package publish import ( "context" + "errors" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" @@ -9,26 +10,41 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) +var ErrRelayNotAvailable = errors.New("relay is not available") +var ErrLightpushNotAvailable = errors.New("lightpush is not available") + func NewDefaultPublisher(lightpush *lightpush.WakuLightPush, relay *relay.WakuRelay) Publisher { return &defaultPublisher{ - lightPush: lightpush, + lightpush: lightpush, relay: relay, } } type defaultPublisher struct { - lightPush *lightpush.WakuLightPush + lightpush *lightpush.WakuLightPush relay *relay.WakuRelay } -func (d *defaultPublisher) RelayListPeers(pubsubTopic string) []peer.ID { - return d.relay.PubSub().ListPeers(pubsubTopic) +func (d *defaultPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) { + if d.relay == nil { + return nil, ErrRelayNotAvailable + } + + return d.relay.PubSub().ListPeers(pubsubTopic), nil } func (d *defaultPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + if d.relay == nil { + return pb.MessageHash{}, ErrRelayNotAvailable + } + return d.relay.Publish(ctx, message, relay.WithPubSubTopic(pubsubTopic)) } func (d *defaultPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) { - return d.lightPush.Publish(ctx, message, lightpush.WithPubSubTopic(pubsubTopic), lightpush.WithMaxPeers(maxPeers)) + if d.lightpush == nil { + return pb.MessageHash{}, ErrLightpushNotAvailable + } + + return d.lightpush.Publish(ctx, message, lightpush.WithPubSubTopic(pubsubTopic), lightpush.WithMaxPeers(maxPeers)) } diff --git a/waku/v2/api/publish/default_verifier.go b/waku/v2/api/publish/default_verifier.go index 76fb2b39..68eca030 100644 --- a/waku/v2/api/publish/default_verifier.go +++ b/waku/v2/api/publish/default_verifier.go @@ -18,7 +18,7 @@ type defaultStorenodeMessageVerifier struct { store *store.WakuStore } -func (d *defaultStorenodeMessageVerifier) MessagesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { +func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { var opts []store.RequestOption opts = append(opts, store.WithRequestID(requestID)) opts = append(opts, store.WithPeer(peerID)) diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index 11a37778..8a37e20c 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -33,7 +33,7 @@ type ISentCheck interface { type StorenodeMessageVerifier interface { // MessagesExist returns a list of the messages it found from a list of message hashes - MessagesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) + MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) } // MessageSentCheck tracks the outgoing messages and check against store node @@ -228,7 +228,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout) defer cancel() - result, err := m.messageVerifier.MessagesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes) + result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes) if err != nil { m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) return []common.Hash{} diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index c892fabd..c457589e 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -38,7 +38,7 @@ func (pm PublishMethod) String() string { type Publisher interface { // RelayListPeers returns the list of peers for a pubsub topic - RelayListPeers(pubsubTopic string) []peer.ID + RelayListPeers(pubsubTopic string) ([]peer.ID, error) // RelayPublish publishes a message via WakuRelay RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) @@ -128,9 +128,12 @@ func (ms *MessageSender) Send(req *Request) error { return err } case Relay: - peerCnt := len(ms.publisher.RelayListPeers(req.envelope.PubsubTopic())) - logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) - _, err := ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic()) + peers, err := ms.publisher.RelayListPeers(req.envelope.PubsubTopic()) + if err != nil { + return err + } + logger.Info("publishing message via relay", zap.Int("peerCnt", len(peers))) + _, err = ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic()) if err != nil { return err }