refactor_: extract libwaku calls into WakuNode struct (#6027)
This commit is contained in:
parent
d17610d280
commit
28ee341959
|
@ -8,5 +8,5 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type StorenodeRequestor interface {
|
type StorenodeRequestor interface {
|
||||||
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
Query(ctx context.Context, peerInfo peer.AddrInfo, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -337,6 +337,24 @@ func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
|
||||||
return m.activeStorenode
|
return m.activeStorenode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *StorenodeCycle) GetActiveStorenodePeerInfo() peer.AddrInfo {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
storeNodes, err := m.storenodeConfigProvider.Storenodes()
|
||||||
|
if err != nil {
|
||||||
|
return peer.AddrInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range storeNodes {
|
||||||
|
if p.ID == m.activeStorenode {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return peer.AddrInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
|
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
|
||||||
return m.storenodeStatus(peerID) == connected
|
return m.storenodeStatus(peerID) == connected
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ type HistoryRetriever struct {
|
||||||
|
|
||||||
type HistoryProcessor interface {
|
type HistoryProcessor interface {
|
||||||
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
|
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
|
||||||
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
|
OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
||||||
|
@ -51,7 +51,7 @@ func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor Histo
|
||||||
func (hr *HistoryRetriever) Query(
|
func (hr *HistoryRetriever) Query(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
criteria store.FilterCriteria,
|
criteria store.FilterCriteria,
|
||||||
storenodeID peer.ID,
|
storenode peer.AddrInfo,
|
||||||
pageLimit uint64,
|
pageLimit uint64,
|
||||||
shouldProcessNextPage func(int) (bool, uint64),
|
shouldProcessNextPage func(int) (bool, uint64),
|
||||||
processEnvelopes bool,
|
processEnvelopes bool,
|
||||||
|
@ -159,7 +159,7 @@ loop:
|
||||||
}()
|
}()
|
||||||
|
|
||||||
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
||||||
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenode, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
||||||
queryCancel()
|
queryCancel()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -212,7 +212,7 @@ loop:
|
||||||
|
|
||||||
func (hr *HistoryRetriever) createMessagesRequest(
|
func (hr *HistoryRetriever) createMessagesRequest(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
peerID peer.ID,
|
peerInfo peer.AddrInfo,
|
||||||
criteria store.FilterCriteria,
|
criteria store.FilterCriteria,
|
||||||
cursor []byte,
|
cursor []byte,
|
||||||
limit uint64,
|
limit uint64,
|
||||||
|
@ -228,7 +228,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes)
|
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, processEnvelopes)
|
||||||
resultCh <- struct {
|
resultCh <- struct {
|
||||||
storeCursor []byte
|
storeCursor []byte
|
||||||
envelopesCount int
|
envelopesCount int
|
||||||
|
@ -244,7 +244,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
go func() {
|
||||||
_, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false)
|
_, _, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("failed to request store messages", zap.Error(err))
|
logger.Error("failed to request store messages", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -254,9 +254,9 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
|
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
|
||||||
requestID := protocol.GenerateRequestID()
|
requestID := protocol.GenerateRequestID()
|
||||||
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
|
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerInfo.ID))
|
||||||
|
|
||||||
logger.Debug("store.query",
|
logger.Debug("store.query",
|
||||||
logging.Timep("startTime", criteria.TimeStart),
|
logging.Timep("startTime", criteria.TimeStart),
|
||||||
|
@ -278,12 +278,12 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
|
||||||
}
|
}
|
||||||
|
|
||||||
queryStart := time.Now()
|
queryStart := time.Now()
|
||||||
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
|
result, err := hr.store.Query(ctx, peerInfo, storeQueryRequest)
|
||||||
queryDuration := time.Since(queryStart)
|
queryDuration := time.Since(queryStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("error querying storenode", zap.Error(err))
|
logger.Error("error querying storenode", zap.Error(err))
|
||||||
|
|
||||||
hr.historyProcessor.OnRequestFailed(requestID, peerID, err)
|
hr.historyProcessor.OnRequestFailed(requestID, peerInfo, err)
|
||||||
|
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type criteriaInterest struct {
|
type criteriaInterest struct {
|
||||||
peerID peer.ID
|
peerInfo peer.AddrInfo
|
||||||
contentFilter protocol.ContentFilter
|
contentFilter protocol.ContentFilter
|
||||||
lastChecked time.Time
|
lastChecked time.Time
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ type criteriaInterest struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c criteriaInterest) equals(other criteriaInterest) bool {
|
func (c criteriaInterest) equals(other criteriaInterest) bool {
|
||||||
if c.peerID != other.peerID {
|
if c.peerInfo.ID != other.peerInfo.ID {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,10 +20,10 @@ type defaultStorenodeRequestor struct {
|
||||||
store *store.WakuStore
|
store *store.WakuStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
|
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
|
||||||
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
|
return d.store.QueryByHash(ctx, messageHashes, store.WithPeerAddr(peerInfo.Addrs...), store.WithPaging(false, pageSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
||||||
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
|
return d.store.RequestRaw(ctx, peerInfo, storeQueryRequest)
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,13 +66,13 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) {
|
func (m *MissingMessageVerifier) SetCriteriaInterest(peerInfo peer.AddrInfo, contentFilter protocol.ContentFilter) {
|
||||||
m.criteriaInterestMu.Lock()
|
m.criteriaInterestMu.Lock()
|
||||||
defer m.criteriaInterestMu.Unlock()
|
defer m.criteriaInterestMu.Unlock()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(m.ctx)
|
ctx, cancel := context.WithCancel(m.ctx)
|
||||||
criteriaInterest := criteriaInterest{
|
criteriaInterest := criteriaInterest{
|
||||||
peerID: peerID,
|
peerInfo: peerInfo,
|
||||||
contentFilter: contentFilter,
|
contentFilter: contentFilter,
|
||||||
lastChecked: m.timesource.Now().Add(-m.params.delay),
|
lastChecked: m.timesource.Now().Add(-m.params.delay),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
@ -164,7 +164,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Error("could not fetch history",
|
m.logger.Error("could not fetch history",
|
||||||
zap.Stringer("peerID", interest.peerID),
|
zap.Stringer("peerID", interest.peerInfo.ID),
|
||||||
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
||||||
zap.Strings("contentTopics", contentTopics))
|
zap.Strings("contentTopics", contentTopics))
|
||||||
continue
|
continue
|
||||||
|
@ -207,7 +207,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||||
contentTopics := interest.contentFilter.ContentTopics.ToList()
|
contentTopics := interest.contentFilter.ContentTopics.ToList()
|
||||||
|
|
||||||
logger := m.logger.With(
|
logger := m.logger.With(
|
||||||
zap.Stringer("peerID", interest.peerID),
|
zap.Stringer("peerID", interest.peerInfo.ID),
|
||||||
zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]),
|
zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]),
|
||||||
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
||||||
logging.Epoch("from", interest.lastChecked),
|
logging.Epoch("from", interest.lastChecked),
|
||||||
|
@ -226,7 +226,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||||
|
|
||||||
return m.storenodeRequestor.Query(
|
return m.storenodeRequestor.Query(
|
||||||
ctx,
|
ctx,
|
||||||
interest.peerID,
|
interest.peerInfo,
|
||||||
storeQueryRequest,
|
storeQueryRequest,
|
||||||
)
|
)
|
||||||
}, logger, "retrieving history to check for missing messages")
|
}, logger, "retrieving history to check for missing messages")
|
||||||
|
@ -309,7 +309,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||||
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
|
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
|
return m.storenodeRequestor.Query(queryCtx, interest.peerInfo, storeQueryRequest)
|
||||||
}, logger, "retrieving missing messages")
|
}, logger, "retrieving missing messages")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
|
|
@ -18,10 +18,10 @@ type defaultStorenodeMessageVerifier struct {
|
||||||
store *store.WakuStore
|
store *store.WakuStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||||
var opts []store.RequestOption
|
var opts []store.RequestOption
|
||||||
opts = append(opts, store.WithRequestID(requestID))
|
opts = append(opts, store.WithRequestID(requestID))
|
||||||
opts = append(opts, store.WithPeer(peerID))
|
opts = append(opts, store.WithPeerAddr(peerID.Addrs...))
|
||||||
opts = append(opts, store.WithPaging(false, pageSize))
|
opts = append(opts, store.WithPaging(false, pageSize))
|
||||||
opts = append(opts, store.IncludeData(false))
|
opts = append(opts, store.IncludeData(false))
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ type ISentCheck interface {
|
||||||
|
|
||||||
type StorenodeMessageVerifier interface {
|
type StorenodeMessageVerifier interface {
|
||||||
// MessagesExist returns a list of the messages it found from a list of message hashes
|
// MessagesExist returns a list of the messages it found from a list of message hashes
|
||||||
MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
|
MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MessageSentCheck tracks the outgoing messages and check against store node
|
// MessageSentCheck tracks the outgoing messages and check against store node
|
||||||
|
@ -211,8 +211,8 @@ func (m *MessageSentCheck) Start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
|
func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
|
||||||
selectedPeer := m.storenodeCycle.GetActiveStorenode()
|
selectedPeer := m.storenodeCycle.GetActiveStorenodePeerInfo()
|
||||||
if selectedPeer == "" {
|
if selectedPeer.ID == "" {
|
||||||
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
|
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
|
||||||
return []common.Hash{}
|
return []common.Hash{}
|
||||||
}
|
}
|
||||||
|
@ -224,13 +224,13 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
|
||||||
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
|
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
|
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Stringers("messageHashes", messageHashes))
|
||||||
|
|
||||||
queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
|
queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
|
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
|
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Error(err))
|
||||||
return []common.Hash{}
|
return []common.Hash{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -699,8 +699,8 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
|
||||||
|
|
||||||
// AddPeer is used to add a peer and the protocols it support to the node peerstore
|
// AddPeer is used to add a peer and the protocols it support to the node peerstore
|
||||||
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
|
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
|
||||||
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
func (w *WakuNode) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
||||||
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
|
pData, err := w.peermanager.AddPeer(addresses, origin, pubSubTopics, protocols...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
@ -684,13 +684,19 @@ func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsu
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPeer adds peer to the peerStore and also to service slots
|
// AddPeer adds peer to the peerStore and also to service slots
|
||||||
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
|
func (pm *PeerManager) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
|
||||||
//Assuming all addresses have peerId
|
//Assuming all addresses have peerId
|
||||||
info, err := peer.AddrInfoFromP2pAddr(address)
|
infoArr, err := peer.AddrInfosFromP2pAddrs(addresses...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(infoArr) > 1 {
|
||||||
|
return nil, errors.New("only a single peerID is expected in AddPeer")
|
||||||
|
}
|
||||||
|
|
||||||
|
info := infoArr[0]
|
||||||
|
|
||||||
//Add Service peers to serviceSlots.
|
//Add Service peers to serviceSlots.
|
||||||
for _, proto := range protocols {
|
for _, proto := range protocols {
|
||||||
pm.addPeerToServiceSlot(proto, info.ID)
|
pm.addPeerToServiceSlot(proto, info.ID)
|
||||||
|
@ -704,10 +710,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTo
|
||||||
|
|
||||||
pData := &service.PeerData{
|
pData := &service.PeerData{
|
||||||
Origin: origin,
|
Origin: origin,
|
||||||
AddrInfo: peer.AddrInfo{
|
AddrInfo: info,
|
||||||
ID: info.ID,
|
|
||||||
Addrs: info.Addrs,
|
|
||||||
},
|
|
||||||
PubsubTopics: pubsubTopics,
|
PubsubTopics: pubsubTopics,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
|
@ -325,7 +326,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
|
||||||
|
|
||||||
//Add Peer to peerstore.
|
//Add Peer to peerstore.
|
||||||
if params.pm != nil && params.peerAddr != nil {
|
if params.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1)
|
pData, err := wf.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"github.com/waku-org/go-waku/tests"
|
"github.com/waku-org/go-waku/tests"
|
||||||
|
@ -101,7 +102,7 @@ func (s *FilterTestSuite) TearDownTest() {
|
||||||
|
|
||||||
func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) {
|
func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) {
|
||||||
mAddr := tests.GetAddr(h2.h)
|
mAddr := tests.GetAddr(h2.h)
|
||||||
_, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
|
_, err := h1.pm.AddPeer([]multiaddr.Multiaddr{mAddr}, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
|
||||||
s.Log.Info("add peer", zap.Stringer("mAddr", mAddr))
|
s.Log.Info("add peer", zap.Stringer("mAddr", mAddr))
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -310,7 +310,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||||
|
|
||||||
//Add Peer to peerstore.
|
//Add Peer to peerstore.
|
||||||
if store.pm != nil && params.peerAddr != nil {
|
if store.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
pData, err := store.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
3
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
3
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
|
@ -273,7 +274,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.pm != nil && params.peerAddr != nil {
|
if params.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1)
|
pData, err := wakuLP.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
@ -36,7 +37,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.pm != nil && params.peerAddr != nil {
|
if params.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := wakuPX.pm.AddPeer(params.peerAddr, peerstore.Static, []string{}, PeerExchangeID_v20alpha1)
|
pData, err := wakuPX.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{}, PeerExchangeID_v20alpha1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,15 +194,15 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
func (s *WakuStore) RequestRaw(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
||||||
err := storeRequest.Validate()
|
err := storeRequest.Validate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var params Parameters
|
var params Parameters
|
||||||
params.selectedPeer = peerID
|
params.peerAddr = peerInfo.Addrs
|
||||||
if params.selectedPeer == "" {
|
if len(params.peerAddr) == 0 {
|
||||||
return nil, ErrMustSelectPeer
|
return nil, ErrMustSelectPeer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
type Parameters struct {
|
type Parameters struct {
|
||||||
selectedPeer peer.ID
|
selectedPeer peer.ID
|
||||||
peerAddr multiaddr.Multiaddr
|
peerAddr []multiaddr.Multiaddr
|
||||||
peerSelectionType peermanager.PeerSelection
|
peerSelectionType peermanager.PeerSelection
|
||||||
preferredPeers peer.IDSlice
|
preferredPeers peer.IDSlice
|
||||||
requestID []byte
|
requestID []byte
|
||||||
|
@ -33,7 +33,7 @@ type RequestOption func(*Parameters) error
|
||||||
func WithPeer(p peer.ID) RequestOption {
|
func WithPeer(p peer.ID) RequestOption {
|
||||||
return func(params *Parameters) error {
|
return func(params *Parameters) error {
|
||||||
params.selectedPeer = p
|
params.selectedPeer = p
|
||||||
if params.peerAddr != nil {
|
if len(params.peerAddr) != 0 {
|
||||||
return errors.New("WithPeer and WithPeerAddr options are mutually exclusive")
|
return errors.New("WithPeer and WithPeerAddr options are mutually exclusive")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -43,7 +43,7 @@ func WithPeer(p peer.ID) RequestOption {
|
||||||
// WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
// WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
||||||
// This new peer will be added to peerStore.
|
// This new peer will be added to peerStore.
|
||||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
|
func WithPeerAddr(pAddr ...multiaddr.Multiaddr) RequestOption {
|
||||||
return func(params *Parameters) error {
|
return func(params *Parameters) error {
|
||||||
params.peerAddr = pAddr
|
params.peerAddr = pAddr
|
||||||
if params.selectedPeer != "" {
|
if params.selectedPeer != "" {
|
||||||
|
|
|
@ -1869,7 +1869,7 @@ func (w *Waku) timestamp() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||||
peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200)
|
peerID, err := w.node.AddPeer([]multiaddr.Multiaddr{address}, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,6 @@ func (hr *HistoryProcessorWrapper) OnEnvelope(env *protocol.Envelope, processEnv
|
||||||
return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes)
|
return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {
|
func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) {
|
||||||
hr.waku.onHistoricMessagesRequestFailed(requestID, peerID, err)
|
hr.waku.onHistoricMessagesRequestFailed(requestID, peerInfo, err)
|
||||||
}
|
}
|
||||||
|
|
1726
wakuv2/nwaku.go
1726
wakuv2/nwaku.go
File diff suppressed because it is too large
Load Diff
|
@ -12,9 +12,13 @@ import (
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v3"
|
"github.com/cenkalti/backoff/v3"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
|
@ -24,6 +28,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/status-im/status-go/protocol/tt"
|
"github.com/status-im/status-go/protocol/tt"
|
||||||
|
"github.com/status-im/status-go/wakuv2/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im"
|
var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im"
|
||||||
|
@ -150,6 +155,28 @@ func parseNodes(rec []string) []*enode.Node {
|
||||||
return ns
|
return ns
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testStorenodeConfigProvider struct {
|
||||||
|
storenode peer.AddrInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStorenodeConfigProvider) UseStorenodes() (bool, error) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStorenodeConfigProvider) GetPinnedStorenode() (peer.AddrInfo, error) {
|
||||||
|
return peer.AddrInfo{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStorenodeConfigProvider) Storenodes() ([]peer.AddrInfo, error) {
|
||||||
|
return []peer.AddrInfo{t.storenode}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestStorenodeConfigProvider(storenode peer.AddrInfo) history.StorenodeConfigProvider {
|
||||||
|
return &testStorenodeConfigProvider{
|
||||||
|
storenode: storenode,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// In order to run these tests, you must run an nwaku node
|
// In order to run these tests, you must run an nwaku node
|
||||||
//
|
//
|
||||||
// Using Docker:
|
// Using Docker:
|
||||||
|
@ -187,7 +214,7 @@ func TestBasicWakuV2(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, w.Start())
|
require.NoError(t, w.Start())
|
||||||
|
|
||||||
enr, err := w.ENR()
|
enr, err := w.node.ENR()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, enr)
|
require.NotNil(t, enr)
|
||||||
|
|
||||||
|
@ -197,7 +224,7 @@ func TestBasicWakuV2(t *testing.T) {
|
||||||
|
|
||||||
// Sanity check, not great, but it's probably helpful
|
// Sanity check, not great, but it's probably helpful
|
||||||
err = tt.RetryWithBackOff(func() error {
|
err = tt.RetryWithBackOff(func() error {
|
||||||
numConnected, err := w.GetNumConnectedRelayPeers()
|
numConnected, err := w.node.GetNumConnectedPeers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -212,40 +239,39 @@ func TestBasicWakuV2(t *testing.T) {
|
||||||
// Get local store node address
|
// Get local store node address
|
||||||
storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0])
|
storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, err)
|
|
||||||
|
for i := 0; i <= 100; i++ {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode))
|
||||||
|
|
||||||
// Check that we are indeed connected to the store node
|
// Check that we are indeed connected to the store node
|
||||||
connectedStoreNodes, err := w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300))
|
connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
||||||
|
|
||||||
// Disconnect from the store node
|
// Disconnect from the store node
|
||||||
err = w.DropPeer(storeNode.ID)
|
err = w.node.DisconnectPeerByID(storeNode.ID)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Check that we are indeed disconnected
|
// Check that we are indeed disconnected
|
||||||
connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300))
|
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
|
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
|
||||||
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
|
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
|
||||||
|
|
||||||
storeNodeMultiadd, err := multiaddr.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Re-connect
|
// Re-connect
|
||||||
err = w.DialPeer(storeNodeMultiadd)
|
err = w.DialPeerByID(storeNode.ID)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
// Check that we are connected again
|
// Check that we are connected again
|
||||||
connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300))
|
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
||||||
|
|
||||||
/*
|
|
||||||
filter := &common.Filter{
|
filter := &common.Filter{
|
||||||
PubsubTopic: config.DefaultShardPubsubTopic,
|
PubsubTopic: w.cfg.DefaultShardPubsubTopic,
|
||||||
Messages: common.NewMemoryMessageStore(),
|
Messages: common.NewMemoryMessageStore(),
|
||||||
ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}),
|
ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}),
|
||||||
}
|
}
|
||||||
|
@ -258,7 +284,7 @@ func TestBasicWakuV2(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
_, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{
|
msgID, err := w.Send(w.cfg.DefaultShardPubsubTopic, &pb.WakuMessage{
|
||||||
Payload: []byte{1, 2, 3, 4, 5},
|
Payload: []byte{1, 2, 3, 4, 5},
|
||||||
ContentTopic: contentTopic.ContentTopic(),
|
ContentTopic: contentTopic.ContentTopic(),
|
||||||
Version: proto.Uint32(0),
|
Version: proto.Uint32(0),
|
||||||
|
@ -266,6 +292,7 @@ func TestBasicWakuV2(t *testing.T) {
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NotEqual(t, msgID, "1")
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
@ -280,28 +307,34 @@ func TestBasicWakuV2(t *testing.T) {
|
||||||
b.InitialInterval = 500 * time.Millisecond
|
b.InitialInterval = 500 * time.Millisecond
|
||||||
}
|
}
|
||||||
err = tt.RetryWithBackOff(func() error {
|
err = tt.RetryWithBackOff(func() error {
|
||||||
_, envelopeCount, err := w.Query(
|
err := w.HistoryRetriever.Query(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
storeNode.PeerID,
|
|
||||||
store.FilterCriteria{
|
store.FilterCriteria{
|
||||||
ContentFilter: protocol.NewContentFilter(config.DefaultShardPubsubTopic, contentTopic.ContentTopic()),
|
ContentFilter: protocol.NewContentFilter(w.cfg.DefaultShardPubsubTopic, contentTopic.ContentTopic()),
|
||||||
TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
|
TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
|
||||||
TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
|
TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
|
||||||
},
|
},
|
||||||
nil,
|
*storeNode,
|
||||||
nil,
|
10,
|
||||||
false,
|
nil, false,
|
||||||
)
|
)
|
||||||
if err != nil || envelopeCount == 0 {
|
|
||||||
|
return err
|
||||||
|
|
||||||
|
// TODO-nwaku
|
||||||
|
/*if err != nil || envelopeCount == 0 {
|
||||||
// in case of failure extend timestamp margin up to 40secs
|
// in case of failure extend timestamp margin up to 40secs
|
||||||
if marginInSeconds < 40 {
|
if marginInSeconds < 40 {
|
||||||
marginInSeconds += 5
|
marginInSeconds += 5
|
||||||
}
|
}
|
||||||
return errors.New("no messages received from store node")
|
return errors.New("no messages received from store node")
|
||||||
}
|
}
|
||||||
return nil
|
return nil*/
|
||||||
|
|
||||||
}, options)
|
}, options)
|
||||||
require.NoError(t, err) */
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
|
||||||
require.NoError(t, w.Stop())
|
require.NoError(t, w.Stop())
|
||||||
}
|
}
|
||||||
|
@ -356,10 +389,10 @@ func TestPeerExchange(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
discV5NodePeerId, err := discV5Node.PeerID()
|
discV5NodePeerId, err := discV5Node.node.PeerID()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
discv5NodeEnr, err := discV5Node.ENR()
|
discv5NodeEnr, err := discV5Node.node.ENR()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
pxServerConfig := Config{
|
pxServerConfig := Config{
|
||||||
|
@ -387,7 +420,7 @@ func TestPeerExchange(t *testing.T) {
|
||||||
// Adding an extra second to make sure PX cache is not empty
|
// Adding an extra second to make sure PX cache is not empty
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
serverNodeMa, err := pxServerNode.ListenAddresses()
|
serverNodeMa, err := pxServerNode.node.ListenAddresses()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, serverNodeMa)
|
require.NotNil(t, serverNodeMa)
|
||||||
|
|
||||||
|
@ -398,7 +431,7 @@ func TestPeerExchange(t *testing.T) {
|
||||||
|
|
||||||
// Check that pxServerNode has discV5Node in its Peer Store
|
// Check that pxServerNode has discV5Node in its Peer Store
|
||||||
err = tt.RetryWithBackOff(func() error {
|
err = tt.RetryWithBackOff(func() error {
|
||||||
peers, err := pxServerNode.GetPeerIdsFromPeerStore()
|
peers, err := pxServerNode.node.GetPeerIDsFromPeerStore()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -436,12 +469,12 @@ func TestPeerExchange(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
pxServerPeerId, err := pxServerNode.PeerID()
|
pxServerPeerId, err := pxServerNode.node.PeerID()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Check that the light node discovered the discV5Node and has both nodes in its peer store
|
// Check that the light node discovered the discV5Node and has both nodes in its peer store
|
||||||
err = tt.RetryWithBackOff(func() error {
|
err = tt.RetryWithBackOff(func() error {
|
||||||
peers, err := lightNode.GetPeerIdsFromPeerStore()
|
peers, err := lightNode.node.GetPeerIDsFromPeerStore()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -455,7 +488,7 @@ func TestPeerExchange(t *testing.T) {
|
||||||
|
|
||||||
// Now perform the PX request manually to see if it also works
|
// Now perform the PX request manually to see if it also works
|
||||||
err = tt.RetryWithBackOff(func() error {
|
err = tt.RetryWithBackOff(func() error {
|
||||||
numPeersReceived, err := lightNode.WakuPeerExchangeRequest(1)
|
numPeersReceived, err := lightNode.node.PeerExchangeRequest(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -551,88 +584,6 @@ func TestPeerExchange(t *testing.T) {
|
||||||
require.NoError(t, discV5Node.Stop()) */
|
require.NoError(t, discV5Node.Stop()) */
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDial(t *testing.T) {
|
|
||||||
logger, err := zap.NewDevelopment()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
dialerNodeConfig := Config{
|
|
||||||
UseThrottledPublish: true,
|
|
||||||
ClusterID: 16,
|
|
||||||
}
|
|
||||||
|
|
||||||
// start node that will initiate the dial
|
|
||||||
dialerNodeWakuConfig := WakuConfig{
|
|
||||||
EnableRelay: true,
|
|
||||||
LogLevel: "DEBUG",
|
|
||||||
Discv5Discovery: false,
|
|
||||||
ClusterID: 16,
|
|
||||||
Shards: []uint16{64},
|
|
||||||
Discv5UdpPort: 9020,
|
|
||||||
TcpPort: 60020,
|
|
||||||
}
|
|
||||||
|
|
||||||
dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, dialerNode.Start())
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
receiverNodeConfig := Config{
|
|
||||||
UseThrottledPublish: true,
|
|
||||||
ClusterID: 16,
|
|
||||||
}
|
|
||||||
|
|
||||||
// start node that will receive the dial
|
|
||||||
receiverNodeWakuConfig := WakuConfig{
|
|
||||||
EnableRelay: true,
|
|
||||||
LogLevel: "DEBUG",
|
|
||||||
Discv5Discovery: false,
|
|
||||||
ClusterID: 16,
|
|
||||||
Shards: []uint16{64},
|
|
||||||
Discv5UdpPort: 9021,
|
|
||||||
TcpPort: 60021,
|
|
||||||
}
|
|
||||||
|
|
||||||
receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, receiverNode.Start())
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
receiverMultiaddr, err := receiverNode.ListenAddresses()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotNil(t, receiverMultiaddr)
|
|
||||||
|
|
||||||
// Check that both nodes start with no connected peers
|
|
||||||
dialerPeerCount, err := dialerNode.PeerCount()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers")
|
|
||||||
|
|
||||||
receiverPeerCount, err := receiverNode.PeerCount()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers")
|
|
||||||
|
|
||||||
// Dial
|
|
||||||
err = dialerNode.DialPeer(receiverMultiaddr[0])
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
// Check that both nodes now have one connected peer
|
|
||||||
dialerPeerCount, err = dialerNode.PeerCount()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer")
|
|
||||||
|
|
||||||
receiverPeerCount, err = receiverNode.PeerCount()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
|
|
||||||
|
|
||||||
// Stop nodes
|
|
||||||
require.NoError(t, dialerNode.Stop())
|
|
||||||
require.NoError(t, receiverNode.Stop())
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDnsDiscover(t *testing.T) {
|
func TestDnsDiscover(t *testing.T) {
|
||||||
logger, err := zap.NewDevelopment()
|
logger, err := zap.NewDevelopment()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -655,15 +606,80 @@ func TestDnsDiscover(t *testing.T) {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
|
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
|
||||||
|
|
||||||
res, err := node.WakuDnsDiscovery(sampleEnrTree, nodeConfig.Nameserver, int(requestTimeout/time.Millisecond))
|
ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
res, err := node.node.DnsDiscovery(ctx, sampleEnrTree, nodeConfig.Nameserver)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query")
|
require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query")
|
||||||
|
|
||||||
// Stop nodes
|
// Stop nodes
|
||||||
require.NoError(t, node.Stop())
|
require.NoError(t, node.Stop())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDial(t *testing.T) {
|
||||||
|
logger, err := zap.NewDevelopment()
|
||||||
|
require.NoError(t, err)
|
||||||
|
dialerNodeConfig := Config{
|
||||||
|
UseThrottledPublish: true,
|
||||||
|
ClusterID: 16,
|
||||||
|
}
|
||||||
|
// start node that will initiate the dial
|
||||||
|
dialerNodeWakuConfig := WakuConfig{
|
||||||
|
EnableRelay: true,
|
||||||
|
LogLevel: "DEBUG",
|
||||||
|
Discv5Discovery: false,
|
||||||
|
ClusterID: 16,
|
||||||
|
Shards: []uint16{64},
|
||||||
|
Discv5UdpPort: 9020,
|
||||||
|
TcpPort: 60020,
|
||||||
|
}
|
||||||
|
dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, dialerNode.Start())
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
receiverNodeConfig := Config{
|
||||||
|
UseThrottledPublish: true,
|
||||||
|
ClusterID: 16,
|
||||||
|
}
|
||||||
|
// start node that will receive the dial
|
||||||
|
receiverNodeWakuConfig := WakuConfig{
|
||||||
|
EnableRelay: true,
|
||||||
|
LogLevel: "DEBUG",
|
||||||
|
Discv5Discovery: false,
|
||||||
|
ClusterID: 16,
|
||||||
|
Shards: []uint16{64},
|
||||||
|
Discv5UdpPort: 9021,
|
||||||
|
TcpPort: 60021,
|
||||||
|
}
|
||||||
|
receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, receiverNode.Start())
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
receiverMultiaddr, err := receiverNode.node.ListenAddresses()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, receiverMultiaddr)
|
||||||
|
// Check that both nodes start with no connected peers
|
||||||
|
dialerPeerCount, err := dialerNode.PeerCount()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers")
|
||||||
|
receiverPeerCount, err := receiverNode.PeerCount()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers")
|
||||||
|
// Dial
|
||||||
|
err = dialerNode.DialPeer(receiverMultiaddr[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
// Check that both nodes now have one connected peer
|
||||||
|
dialerPeerCount, err = dialerNode.PeerCount()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer")
|
||||||
|
receiverPeerCount, err = receiverNode.PeerCount()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
|
||||||
|
// Stop nodes
|
||||||
|
require.NoError(t, dialerNode.Stop())
|
||||||
|
require.NoError(t, receiverNode.Stop())
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
func TestWakuV2Filter(t *testing.T) {
|
func TestWakuV2Filter(t *testing.T) {
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
//go:build use_nwaku
|
||||||
|
// +build use_nwaku
|
||||||
|
|
||||||
|
package wakuv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
type pinger struct {
|
||||||
|
node *WakuNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPinger(node *WakuNode) commonapi.Pinger {
|
||||||
|
return &pinger{
|
||||||
|
node: node,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pinger) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) {
|
||||||
|
return p.node.PingPeer(ctx, peerInfo)
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
//go:build use_nwaku
|
||||||
|
// +build use_nwaku
|
||||||
|
|
||||||
|
package wakuv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/api/publish"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nwakuPublisher struct {
|
||||||
|
node *WakuNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPublisher(node *WakuNode) publish.Publisher {
|
||||||
|
return &nwakuPublisher{
|
||||||
|
node: node,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *nwakuPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) {
|
||||||
|
// TODO-nwaku
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
|
||||||
|
return p.node.RelayPublish(ctx, message, pubsubTopic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LightpushPublish publishes a message via WakuLightPush
|
||||||
|
func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) {
|
||||||
|
// TODO-nwaku
|
||||||
|
return pb.MessageHash{}, nil
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
//go:build use_nwaku
|
||||||
|
// +build use_nwaku
|
||||||
|
|
||||||
|
package wakuv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
|
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type storeResultImpl struct {
|
||||||
|
done bool
|
||||||
|
|
||||||
|
node *WakuNode
|
||||||
|
storeRequest *storepb.StoreQueryRequest
|
||||||
|
storeResponse *storepb.StoreQueryResponse
|
||||||
|
peerInfo peer.AddrInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStoreResultImpl(node *WakuNode, peerInfo peer.AddrInfo, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl {
|
||||||
|
return &storeResultImpl{
|
||||||
|
node: node,
|
||||||
|
storeRequest: storeRequest,
|
||||||
|
storeResponse: storeResponse,
|
||||||
|
peerInfo: peerInfo,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Cursor() []byte {
|
||||||
|
return r.storeResponse.GetPaginationCursor()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) IsComplete() bool {
|
||||||
|
return r.done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) PeerID() peer.ID {
|
||||||
|
return r.peerInfo.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Query() *storepb.StoreQueryRequest {
|
||||||
|
return r.storeRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Response() *storepb.StoreQueryResponse {
|
||||||
|
return r.storeResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error {
|
||||||
|
// TODO: opts is being ignored. Will require some changes in go-waku. For now using this
|
||||||
|
// is not necessary
|
||||||
|
|
||||||
|
if r.storeResponse.GetPaginationCursor() == nil {
|
||||||
|
r.done = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
|
r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor
|
||||||
|
|
||||||
|
storeResponse, err := r.node.StoreQuery(ctx, r.storeRequest, r.peerInfo)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.storeResponse = storeResponse
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue {
|
||||||
|
return r.storeResponse.GetMessages()
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
//go:build use_nwaku
|
||||||
|
// +build use_nwaku
|
||||||
|
|
||||||
|
package wakuv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/api/publish"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
|
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type storenodeMessageVerifier struct {
|
||||||
|
node *WakuNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStorenodeMessageVerifier(node *WakuNode) publish.StorenodeMessageVerifier {
|
||||||
|
return &storenodeMessageVerifier{
|
||||||
|
node: node,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||||
|
requestIDStr := hex.EncodeToString(requestID)
|
||||||
|
storeRequest := &storepb.StoreQueryRequest{
|
||||||
|
RequestId: requestIDStr,
|
||||||
|
MessageHashes: make([][]byte, len(messageHashes)),
|
||||||
|
IncludeData: false,
|
||||||
|
PaginationCursor: nil,
|
||||||
|
PaginationForward: false,
|
||||||
|
PaginationLimit: proto.Uint64(pageSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, mhash := range messageHashes {
|
||||||
|
storeRequest.MessageHashes[i] = mhash.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := d.node.StoreQuery(ctx, storeRequest, peerInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.GetStatusCode() != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, response.GetStatusCode(), response.GetStatusDesc())
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]pb.MessageHash, len(response.Messages))
|
||||||
|
for i, msg := range response.Messages {
|
||||||
|
result[i] = pb.ToMessageHash(msg.GetMessageHash())
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
|
@ -0,0 +1,76 @@
|
||||||
|
//go:build use_nwaku
|
||||||
|
// +build use_nwaku
|
||||||
|
|
||||||
|
package wakuv2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
|
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type storenodeRequestor struct {
|
||||||
|
node *WakuNode
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStorenodeRequestor(node *WakuNode, logger *zap.Logger) commonapi.StorenodeRequestor {
|
||||||
|
return &storenodeRequestor{
|
||||||
|
node: node,
|
||||||
|
logger: logger.Named("storenodeRequestor"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) {
|
||||||
|
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
|
|
||||||
|
logger := s.logger.With(zap.Stringer("peerID", peerInfo.ID), zap.String("requestID", requestIDStr))
|
||||||
|
|
||||||
|
logger.Debug("sending store request")
|
||||||
|
|
||||||
|
storeRequest := &storepb.StoreQueryRequest{
|
||||||
|
RequestId: requestIDStr,
|
||||||
|
MessageHashes: make([][]byte, len(messageHashes)),
|
||||||
|
IncludeData: true,
|
||||||
|
PaginationCursor: nil,
|
||||||
|
PaginationForward: false,
|
||||||
|
PaginationLimit: proto.Uint64(pageSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, mhash := range messageHashes {
|
||||||
|
storeRequest.MessageHashes[i] = mhash.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
storeResponse, err := s.node.StoreQuery(ctx, storeRequest, peerInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||||
|
}
|
||||||
|
|
||||||
|
return newStoreResultImpl(s.node, peerInfo, storeRequest, storeResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *storepb.StoreQueryRequest) (commonapi.StoreRequestResult, error) {
|
||||||
|
storeResponse, err := s.node.StoreQuery(ctx, storeRequest, peerInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("could not query storenode: %s %d %s", storeRequest.RequestId, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||||
|
}
|
||||||
|
|
||||||
|
return newStoreResultImpl(s.node, peerInfo, storeRequest, storeResponse), nil
|
||||||
|
}
|
Loading…
Reference in New Issue