mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-13 15:24:46 +00:00
refactor: use peerInfo instead of peerID
This commit is contained in:
parent
78b522db50
commit
498022acfe
@ -386,7 +386,7 @@ func Execute(options NodeOptions) error {
|
|||||||
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
|
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
|
||||||
logger.Info("retrieving peer info via peer exchange protocol")
|
logger.Info("retrieving peer info via peer exchange protocol")
|
||||||
|
|
||||||
peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static,
|
peerID, err := wakuNode.AddPeer([]multiaddr.Multiaddr{*options.PeerExchange.Node}, wakupeerstore.Static,
|
||||||
pubSubTopicMapKeys, peer_exchange.PeerExchangeID_v20alpha1)
|
pubSubTopicMapKeys, peer_exchange.PeerExchangeID_v20alpha1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
|
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
|
||||||
@ -481,7 +481,7 @@ func processTopics(options NodeOptions) (map[string][]string, error) {
|
|||||||
|
|
||||||
func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) error {
|
func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) error {
|
||||||
for _, addr := range addresses {
|
for _, addr := range addresses {
|
||||||
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, pubSubTopics, protocols...)
|
_, err := wakuNode.AddPeer([]multiaddr.Multiaddr{addr}, wakupeerstore.Static, pubSubTopics, protocols...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not add static peer: %w", err)
|
return fmt.Errorf("could not add static peer: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/core/protocol"
|
"github.com/libp2p/go-libp2p/core/protocol"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
"github.com/waku-org/go-waku/cmd/waku/server"
|
"github.com/waku-org/go-waku/cmd/waku/server"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
@ -117,7 +118,7 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) {
|
|||||||
protos = append(protos, protocol.ID(proto))
|
protos = append(protos, protocol.ID(proto))
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...)
|
id, err := a.node.AddPeer([]multiaddr.Multiaddr{addr}, peerstore.Static, topics, protos...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Error("failed to add peer", zap.Error(err))
|
a.log.Error("failed to add peer", zap.Error(err))
|
||||||
writeErrOrResponse(w, err, nil)
|
writeErrOrResponse(w, err, nil)
|
||||||
|
@ -352,7 +352,7 @@ func AddPeer(instance *WakuInstance, address string, protocolID string) (string,
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
peerID, err := instance.node.AddPeer(ma, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID))
|
peerID, err := instance.node.AddPeer([]multiaddr.Multiaddr{ma}, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -8,5 +8,5 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type StorenodeRequestor interface {
|
type StorenodeRequestor interface {
|
||||||
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
Query(ctx context.Context, peerInfo peer.AddrInfo, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
||||||
}
|
}
|
||||||
|
@ -98,6 +98,12 @@ func (m *StorenodeCycle) DisconnectActiveStorenode(backoff time.Duration) {
|
|||||||
func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error {
|
func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error {
|
||||||
// Handle pinned storenodes
|
// Handle pinned storenodes
|
||||||
m.logger.Info("disconnecting storenode")
|
m.logger.Info("disconnecting storenode")
|
||||||
|
|
||||||
|
if m.storenodeConfigProvider == nil {
|
||||||
|
m.logger.Debug("storenodeConfigProvider not yet setup")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("could not obtain the pinned storenode", zap.Error(err))
|
m.logger.Error("could not obtain the pinned storenode", zap.Error(err))
|
||||||
@ -252,6 +258,11 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.storenodeConfigProvider == nil {
|
||||||
|
m.logger.Debug("storenodeConfigProvider not yet setup")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("Could not obtain the pinned storenode", zap.Error(err))
|
m.logger.Error("Could not obtain the pinned storenode", zap.Error(err))
|
||||||
@ -338,6 +349,29 @@ func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
|
|||||||
return m.activeStorenode
|
return m.activeStorenode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *StorenodeCycle) GetActiveStorenodePeerInfo() peer.AddrInfo {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
if m.storenodeConfigProvider == nil {
|
||||||
|
m.logger.Debug("storenodeConfigProvider not yet setup")
|
||||||
|
return peer.AddrInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
|
storeNodes, err := m.storenodeConfigProvider.Storenodes()
|
||||||
|
if err != nil {
|
||||||
|
return peer.AddrInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range storeNodes {
|
||||||
|
if p.ID == m.activeStorenode {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return peer.AddrInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
|
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
|
||||||
return m.storenodeStatus(peerID) == connected
|
return m.storenodeStatus(peerID) == connected
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ type HistoryRetriever struct {
|
|||||||
|
|
||||||
type HistoryProcessor interface {
|
type HistoryProcessor interface {
|
||||||
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
|
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
|
||||||
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
|
OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
||||||
@ -51,7 +51,7 @@ func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor Histo
|
|||||||
func (hr *HistoryRetriever) Query(
|
func (hr *HistoryRetriever) Query(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
criteria store.FilterCriteria,
|
criteria store.FilterCriteria,
|
||||||
storenodeID peer.ID,
|
storenode peer.AddrInfo,
|
||||||
pageLimit uint64,
|
pageLimit uint64,
|
||||||
shouldProcessNextPage func(int) (bool, uint64),
|
shouldProcessNextPage func(int) (bool, uint64),
|
||||||
processEnvelopes bool,
|
processEnvelopes bool,
|
||||||
@ -178,7 +178,7 @@ loop:
|
|||||||
newCriteria.TimeStart = timeStart
|
newCriteria.TimeStart = timeStart
|
||||||
newCriteria.TimeEnd = timeEnd
|
newCriteria.TimeEnd = timeEnd
|
||||||
|
|
||||||
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenode, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
||||||
queryCancel()
|
queryCancel()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -241,7 +241,7 @@ loop:
|
|||||||
|
|
||||||
func (hr *HistoryRetriever) createMessagesRequest(
|
func (hr *HistoryRetriever) createMessagesRequest(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
peerID peer.ID,
|
peerInfo peer.AddrInfo,
|
||||||
criteria store.FilterCriteria,
|
criteria store.FilterCriteria,
|
||||||
cursor []byte,
|
cursor []byte,
|
||||||
limit uint64,
|
limit uint64,
|
||||||
@ -257,7 +257,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
|||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes)
|
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, processEnvelopes)
|
||||||
resultCh <- struct {
|
resultCh <- struct {
|
||||||
storeCursor []byte
|
storeCursor []byte
|
||||||
envelopesCount int
|
envelopesCount int
|
||||||
@ -273,7 +273,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
go func() {
|
||||||
_, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false)
|
_, _, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("failed to request store messages", zap.Error(err))
|
logger.Error("failed to request store messages", zap.Error(err))
|
||||||
}
|
}
|
||||||
@ -283,9 +283,9 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
|
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
|
||||||
requestID := protocol.GenerateRequestID()
|
requestID := protocol.GenerateRequestID()
|
||||||
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
|
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerInfo.ID))
|
||||||
|
|
||||||
logger.Debug("store.query",
|
logger.Debug("store.query",
|
||||||
logging.Timep("startTime", criteria.TimeStart),
|
logging.Timep("startTime", criteria.TimeStart),
|
||||||
@ -307,12 +307,12 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
|
|||||||
}
|
}
|
||||||
|
|
||||||
queryStart := time.Now()
|
queryStart := time.Now()
|
||||||
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
|
result, err := hr.store.Query(ctx, peerInfo, storeQueryRequest)
|
||||||
queryDuration := time.Since(queryStart)
|
queryDuration := time.Since(queryStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("error querying storenode", zap.Error(err))
|
logger.Error("error querying storenode", zap.Error(err))
|
||||||
|
|
||||||
hr.historyProcessor.OnRequestFailed(requestID, peerID, err)
|
hr.historyProcessor.OnRequestFailed(requestID, peerInfo, err)
|
||||||
|
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,7 @@ func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelop
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {
|
func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockHistoryProcessor() *mockHistoryProcessor {
|
func newMockHistoryProcessor() *mockHistoryProcessor {
|
||||||
@ -92,7 +92,7 @@ func getInitialResponseKey(contentTopics []string) string {
|
|||||||
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
|
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *mockStore) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
func (t *mockStore) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
||||||
result := &mockResult{}
|
result := &mockResult{}
|
||||||
if len(storeQueryRequest.GetPaginationCursor()) == 0 {
|
if len(storeQueryRequest.GetPaginationCursor()) == 0 {
|
||||||
initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics())
|
initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics())
|
||||||
@ -218,7 +218,7 @@ func TestSuccessBatchExecution(t *testing.T) {
|
|||||||
ContentFilter: protocol.NewContentFilter("test", topics...),
|
ContentFilter: protocol.NewContentFilter("test", topics...),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
|
err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -246,6 +246,6 @@ func TestFailedBatchExecution(t *testing.T) {
|
|||||||
ContentFilter: protocol.NewContentFilter("test", topics...),
|
ContentFilter: protocol.NewContentFilter("test", topics...),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
|
err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type criteriaInterest struct {
|
type criteriaInterest struct {
|
||||||
peerID peer.ID
|
peerInfo peer.AddrInfo
|
||||||
contentFilter protocol.ContentFilter
|
contentFilter protocol.ContentFilter
|
||||||
lastChecked time.Time
|
lastChecked time.Time
|
||||||
|
|
||||||
@ -19,7 +19,7 @@ type criteriaInterest struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c criteriaInterest) equals(other criteriaInterest) bool {
|
func (c criteriaInterest) equals(other criteriaInterest) bool {
|
||||||
if c.peerID != other.peerID {
|
if c.peerInfo.ID != other.peerInfo.ID {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,10 +20,10 @@ type defaultStorenodeRequestor struct {
|
|||||||
store *store.WakuStore
|
store *store.WakuStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
|
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
|
||||||
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
|
return d.store.QueryByHash(ctx, messageHashes, store.WithPeerAddr(peerInfo.Addrs...), store.WithPaging(false, pageSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
||||||
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
|
return d.store.RequestRaw(ctx, peerInfo, storeQueryRequest)
|
||||||
}
|
}
|
||||||
|
@ -71,13 +71,13 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) {
|
func (m *MissingMessageVerifier) SetCriteriaInterest(peerInfo peer.AddrInfo, contentFilter protocol.ContentFilter) {
|
||||||
m.criteriaInterestMu.Lock()
|
m.criteriaInterestMu.Lock()
|
||||||
defer m.criteriaInterestMu.Unlock()
|
defer m.criteriaInterestMu.Unlock()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(m.ctx)
|
ctx, cancel := context.WithCancel(m.ctx)
|
||||||
criteriaInterest := criteriaInterest{
|
criteriaInterest := criteriaInterest{
|
||||||
peerID: peerID,
|
peerInfo: peerInfo,
|
||||||
contentFilter: contentFilter,
|
contentFilter: contentFilter,
|
||||||
lastChecked: m.timesource.Now().Add(-m.params.delay),
|
lastChecked: m.timesource.Now().Add(-m.params.delay),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
@ -190,7 +190,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Error("could not fetch history",
|
m.logger.Error("could not fetch history",
|
||||||
zap.Stringer("peerID", interest.peerID),
|
zap.Stringer("peerID", interest.peerInfo.ID),
|
||||||
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
||||||
zap.Strings("contentTopics", contentTopics))
|
zap.Strings("contentTopics", contentTopics))
|
||||||
continue
|
continue
|
||||||
@ -233,7 +233,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||||||
contentTopics := interest.contentFilter.ContentTopics.ToList()
|
contentTopics := interest.contentFilter.ContentTopics.ToList()
|
||||||
|
|
||||||
logger := m.logger.With(
|
logger := m.logger.With(
|
||||||
zap.Stringer("peerID", interest.peerID),
|
zap.Stringer("peerID", interest.peerInfo.ID),
|
||||||
zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]),
|
zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]),
|
||||||
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
||||||
logging.Epoch("from", interest.lastChecked),
|
logging.Epoch("from", interest.lastChecked),
|
||||||
@ -252,7 +252,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||||||
|
|
||||||
return m.storenodeRequestor.Query(
|
return m.storenodeRequestor.Query(
|
||||||
ctx,
|
ctx,
|
||||||
interest.peerID,
|
interest.peerInfo,
|
||||||
storeQueryRequest,
|
storeQueryRequest,
|
||||||
)
|
)
|
||||||
}, logger, "retrieving history to check for missing messages")
|
}, logger, "retrieving history to check for missing messages")
|
||||||
@ -335,7 +335,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||||||
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
|
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
|
return m.storenodeRequestor.Query(queryCtx, interest.peerInfo, storeQueryRequest)
|
||||||
}, logger, "retrieving missing messages")
|
}, logger, "retrieving missing messages")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
@ -18,10 +18,10 @@ type defaultStorenodeMessageVerifier struct {
|
|||||||
store *store.WakuStore
|
store *store.WakuStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||||
var opts []store.RequestOption
|
var opts []store.RequestOption
|
||||||
opts = append(opts, store.WithRequestID(requestID))
|
opts = append(opts, store.WithRequestID(requestID))
|
||||||
opts = append(opts, store.WithPeer(peerID))
|
opts = append(opts, store.WithPeerAddr(peerID.Addrs...))
|
||||||
opts = append(opts, store.WithPaging(false, pageSize))
|
opts = append(opts, store.WithPaging(false, pageSize))
|
||||||
opts = append(opts, store.IncludeData(false))
|
opts = append(opts, store.IncludeData(false))
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ type ISentCheck interface {
|
|||||||
|
|
||||||
type StorenodeMessageVerifier interface {
|
type StorenodeMessageVerifier interface {
|
||||||
// MessagesExist returns a list of the messages it found from a list of message hashes
|
// MessagesExist returns a list of the messages it found from a list of message hashes
|
||||||
MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
|
MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MessageSentCheck tracks the outgoing messages and check against store node
|
// MessageSentCheck tracks the outgoing messages and check against store node
|
||||||
@ -211,8 +211,8 @@ func (m *MessageSentCheck) Start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
|
func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
|
||||||
selectedPeer := m.storenodeCycle.GetActiveStorenode()
|
selectedPeer := m.storenodeCycle.GetActiveStorenodePeerInfo()
|
||||||
if selectedPeer == "" {
|
if selectedPeer.ID == "" {
|
||||||
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
|
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
|
||||||
return []common.Hash{}
|
return []common.Hash{}
|
||||||
}
|
}
|
||||||
@ -224,13 +224,13 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
|
|||||||
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
|
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
|
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Stringers("messageHashes", messageHashes))
|
||||||
|
|
||||||
queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
|
queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
|
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
|
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Error(err))
|
||||||
return []common.Hash{}
|
return []common.Hash{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ func TestPeriodicKeepAlive(t *testing.T) {
|
|||||||
|
|
||||||
node2MAddr, err := multiaddr.NewMultiaddr(host1.Addrs()[0].String() + "/p2p/" + host1.ID().String())
|
node2MAddr, err := multiaddr.NewMultiaddr(host1.Addrs()[0].String() + "/p2p/" + host1.ID().String())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, err = wakuNode.AddPeer(node2MAddr, wps.Static, []string{"waku/rs/1/1"})
|
_, err = wakuNode.AddPeer([]multiaddr.Multiaddr{node2MAddr}, wps.Static, []string{"waku/rs/1/1"})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
time.Sleep(time.Second * 2)
|
time.Sleep(time.Second * 2)
|
||||||
|
@ -703,8 +703,8 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
|
|||||||
|
|
||||||
// AddPeer is used to add a peer and the protocols it support to the node peerstore
|
// AddPeer is used to add a peer and the protocols it support to the node peerstore
|
||||||
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
|
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
|
||||||
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
func (w *WakuNode) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
||||||
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
|
pData, err := w.peermanager.AddPeer(addresses, origin, pubSubTopics, protocols...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -268,7 +268,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer wakuNode2.Stop()
|
defer wakuNode2.Stop()
|
||||||
|
|
||||||
peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
|
peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{
|
subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{
|
||||||
@ -317,7 +317,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer wakuNode3.Stop()
|
defer wakuNode3.Stop()
|
||||||
|
|
||||||
_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4)
|
_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
// NODE2 should have returned the message received via filter
|
// NODE2 should have returned the message received via filter
|
||||||
|
@ -678,13 +678,19 @@ func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsu
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AddPeer adds peer to the peerStore and also to service slots
|
// AddPeer adds peer to the peerStore and also to service slots
|
||||||
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
|
func (pm *PeerManager) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
|
||||||
//Assuming all addresses have peerId
|
//Assuming all addresses have peerId
|
||||||
info, err := peer.AddrInfoFromP2pAddr(address)
|
infoArr, err := peer.AddrInfosFromP2pAddrs(addresses...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(infoArr) > 1 {
|
||||||
|
return nil, errors.New("only a single peerID is expected in AddPeer")
|
||||||
|
}
|
||||||
|
|
||||||
|
info := infoArr[0]
|
||||||
|
|
||||||
//Add Service peers to serviceSlots.
|
//Add Service peers to serviceSlots.
|
||||||
for _, proto := range protocols {
|
for _, proto := range protocols {
|
||||||
pm.addPeerToServiceSlot(proto, info.ID)
|
pm.addPeerToServiceSlot(proto, info.ID)
|
||||||
@ -698,10 +704,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTo
|
|||||||
|
|
||||||
pData := &service.PeerData{
|
pData := &service.PeerData{
|
||||||
Origin: origin,
|
Origin: origin,
|
||||||
AddrInfo: peer.AddrInfo{
|
AddrInfo: info,
|
||||||
ID: info.ID,
|
|
||||||
Addrs: info.Addrs,
|
|
||||||
},
|
|
||||||
PubsubTopics: pubsubTopics,
|
PubsubTopics: pubsubTopics,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/waku-org/go-waku/tests"
|
"github.com/waku-org/go-waku/tests"
|
||||||
@ -57,7 +58,7 @@ func TestServiceSlots(t *testing.T) {
|
|||||||
|
|
||||||
// add h2 peer to peer manager
|
// add h2 peer to peer manager
|
||||||
t.Log(h2.ID())
|
t.Log(h2.ID())
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
///////////////
|
///////////////
|
||||||
@ -71,7 +72,7 @@ func TestServiceSlots(t *testing.T) {
|
|||||||
require.Equal(t, h2.ID(), peers[0])
|
require.Equal(t, h2.ID(), peers[0])
|
||||||
|
|
||||||
// add h3 peer to peer manager
|
// add h3 peer to peer manager
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{""}, libp2pProtocol.ID(protocol))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// check that returned peer is h2 or h3 peer
|
// check that returned peer is h2 or h3 peer
|
||||||
@ -96,7 +97,7 @@ func TestServiceSlots(t *testing.T) {
|
|||||||
require.Error(t, err, utils.ErrNoPeersAvailable)
|
require.Error(t, err, utils.ErrNoPeersAvailable)
|
||||||
|
|
||||||
// add h4 peer for protocol1
|
// add h4 peer for protocol1
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h4)}, wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
//Test peer selection for protocol1
|
//Test peer selection for protocol1
|
||||||
@ -124,10 +125,10 @@ func TestPeerSelection(t *testing.T) {
|
|||||||
defer h3.Close()
|
defer h3.Close()
|
||||||
|
|
||||||
protocol := libp2pProtocol.ID("test/protocol")
|
protocol := libp2pProtocol.ID("test/protocol")
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
|
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
|
||||||
@ -158,7 +159,7 @@ func TestPeerSelection(t *testing.T) {
|
|||||||
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
|
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer h4.Close()
|
defer h4.Close()
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h4)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
|
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
|
||||||
@ -185,7 +186,7 @@ func TestDefaultProtocol(t *testing.T) {
|
|||||||
defer h5.Close()
|
defer h5.Close()
|
||||||
|
|
||||||
//Test peer selection for relay protocol from peer store
|
//Test peer selection for relay protocol from peer store
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200)
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h5)}, wps.Static, []string{""}, relay.WakuRelayID_v200)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
|
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
|
||||||
@ -206,7 +207,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer h6.Close()
|
defer h6.Close()
|
||||||
|
|
||||||
_, err = pm.AddPeer(tests.GetAddr(h6), wps.Static, []string{""}, protocol2)
|
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h6)}, wps.Static, []string{""}, protocol2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
|
peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
@ -348,7 +349,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
|
|||||||
|
|
||||||
//Add Peer to peerstore.
|
//Add Peer to peerstore.
|
||||||
if params.pm != nil && params.peerAddr != nil {
|
if params.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1)
|
pData, err := wf.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"github.com/waku-org/go-waku/tests"
|
"github.com/waku-org/go-waku/tests"
|
||||||
@ -102,7 +103,7 @@ func (s *FilterTestSuite) TearDownTest() {
|
|||||||
|
|
||||||
func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) {
|
func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) {
|
||||||
mAddr := tests.GetAddr(h2.h)
|
mAddr := tests.GetAddr(h2.h)
|
||||||
_, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
|
_, err := h1.pm.AddPeer([]multiaddr.Multiaddr{mAddr}, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
|
||||||
s.Log.Info("add peer", zap.Stringer("mAddr", mAddr))
|
s.Log.Info("add peer", zap.Stringer("mAddr", mAddr))
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
}
|
}
|
||||||
|
@ -310,7 +310,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
|||||||
|
|
||||||
//Add Peer to peerstore.
|
//Add Peer to peerstore.
|
||||||
if store.pm != nil && params.peerAddr != nil {
|
if store.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
pData, err := store.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@ import (
|
|||||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
@ -278,7 +279,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
|||||||
}
|
}
|
||||||
|
|
||||||
if params.pm != nil && params.peerAddr != nil {
|
if params.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1)
|
pData, err := wakuLP.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
@ -36,7 +37,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
|||||||
}
|
}
|
||||||
|
|
||||||
if params.pm != nil && params.peerAddr != nil {
|
if params.pm != nil && params.peerAddr != nil {
|
||||||
pData, err := wakuPX.pm.AddPeer(params.peerAddr, peerstore.Static, []string{}, PeerExchangeID_v20alpha1)
|
pData, err := wakuPX.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{}, PeerExchangeID_v20alpha1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -196,15 +196,15 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
func (s *WakuStore) RequestRaw(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
||||||
err := storeRequest.Validate()
|
err := storeRequest.Validate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var params Parameters
|
var params Parameters
|
||||||
params.selectedPeer = peerID
|
params.peerAddr = peerInfo.Addrs
|
||||||
if params.selectedPeer == "" {
|
if len(params.peerAddr) == 0 {
|
||||||
return nil, ErrMustSelectPeer
|
return nil, ErrMustSelectPeer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
|
|
||||||
type Parameters struct {
|
type Parameters struct {
|
||||||
selectedPeer peer.ID
|
selectedPeer peer.ID
|
||||||
peerAddr multiaddr.Multiaddr
|
peerAddr []multiaddr.Multiaddr
|
||||||
peerSelectionType peermanager.PeerSelection
|
peerSelectionType peermanager.PeerSelection
|
||||||
preferredPeers peer.IDSlice
|
preferredPeers peer.IDSlice
|
||||||
requestID []byte
|
requestID []byte
|
||||||
@ -33,7 +33,7 @@ type RequestOption func(*Parameters) error
|
|||||||
func WithPeer(p peer.ID) RequestOption {
|
func WithPeer(p peer.ID) RequestOption {
|
||||||
return func(params *Parameters) error {
|
return func(params *Parameters) error {
|
||||||
params.selectedPeer = p
|
params.selectedPeer = p
|
||||||
if params.peerAddr != nil {
|
if len(params.peerAddr) != 0 {
|
||||||
return errors.New("WithPeer and WithPeerAddr options are mutually exclusive")
|
return errors.New("WithPeer and WithPeerAddr options are mutually exclusive")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -43,7 +43,7 @@ func WithPeer(p peer.ID) RequestOption {
|
|||||||
// WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
// WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
||||||
// This new peer will be added to peerStore.
|
// This new peer will be added to peerStore.
|
||||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
|
func WithPeerAddr(pAddr ...multiaddr.Multiaddr) RequestOption {
|
||||||
return func(params *Parameters) error {
|
return func(params *Parameters) error {
|
||||||
params.peerAddr = pAddr
|
params.peerAddr = pAddr
|
||||||
if params.selectedPeer != "" {
|
if params.selectedPeer != "" {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user