From 28ee34195913d6fe8974dba80cb7e71ee198ff18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 11 Nov 2024 09:19:50 +0700 Subject: [PATCH] refactor_: extract libwaku calls into WakuNode struct (#6027) --- .../waku/v2/api/common/storenode_requestor.go | 2 +- .../go-waku/waku/v2/api/history/cycle.go | 18 + .../go-waku/waku/v2/api/history/history.go | 20 +- .../waku/v2/api/missing/criteria_interest.go | 4 +- .../waku/v2/api/missing/default_requestor.go | 8 +- .../waku/v2/api/missing/missing_messages.go | 12 +- .../waku/v2/api/publish/default_verifier.go | 4 +- .../waku/v2/api/publish/message_check.go | 10 +- .../go-waku/waku/v2/node/wakunode2.go | 4 +- .../waku/v2/peermanager/peer_manager.go | 17 +- .../go-waku/waku/v2/protocol/filter/client.go | 3 +- .../waku/v2/protocol/filter/test_utils.go | 3 +- .../legacy_store/waku_store_client.go | 2 +- .../v2/protocol/lightpush/waku_lightpush.go | 3 +- .../waku/v2/protocol/peer_exchange/client.go | 3 +- .../go-waku/waku/v2/protocol/store/client.go | 6 +- .../go-waku/waku/v2/protocol/store/options.go | 6 +- wakuv2/gowaku.go | 2 +- wakuv2/history_processor_wrapper.go | 4 +- wakuv2/nwaku.go | 1810 ++++++++--------- wakuv2/nwaku_test.go | 326 +-- wakuv2/pinger.go | 26 + wakuv2/publisher.go | 37 + wakuv2/result.go | 77 + wakuv2/storenode_message_verifier.go | 59 + wakuv2/storenode_requestor.go | 76 + 26 files changed, 1423 insertions(+), 1119 deletions(-) create mode 100644 wakuv2/pinger.go create mode 100644 wakuv2/publisher.go create mode 100644 wakuv2/result.go create mode 100644 wakuv2/storenode_message_verifier.go create mode 100644 wakuv2/storenode_requestor.go diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go index 8a723c9e6..a5076b3f6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go @@ -8,5 +8,5 @@ import ( ) type StorenodeRequestor interface { - Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error) + Query(ctx context.Context, peerInfo peer.AddrInfo, query *pb.StoreQueryRequest) (StoreRequestResult, error) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go index eb51ba040..5a13966b6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go @@ -337,6 +337,24 @@ func (m *StorenodeCycle) GetActiveStorenode() peer.ID { return m.activeStorenode } +func (m *StorenodeCycle) GetActiveStorenodePeerInfo() peer.AddrInfo { + m.RLock() + defer m.RUnlock() + + storeNodes, err := m.storenodeConfigProvider.Storenodes() + if err != nil { + return peer.AddrInfo{} + } + + for _, p := range storeNodes { + if p.ID == m.activeStorenode { + return p + } + } + + return peer.AddrInfo{} +} + func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool { return m.storenodeStatus(peerID) == connected } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go index c31a79dae..da89f4817 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go @@ -37,7 +37,7 @@ type HistoryRetriever struct { type HistoryProcessor interface { OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error - OnRequestFailed(requestID []byte, peerID peer.ID, err error) + OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) } func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { @@ -51,7 +51,7 @@ func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor Histo func (hr *HistoryRetriever) Query( ctx context.Context, criteria store.FilterCriteria, - storenodeID peer.ID, + storenode peer.AddrInfo, pageLimit uint64, shouldProcessNextPage func(int) (bool, uint64), processEnvelopes bool, @@ -159,7 +159,7 @@ loop: }() queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) - cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger) + cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenode, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger) queryCancel() if err != nil { @@ -212,7 +212,7 @@ loop: func (hr *HistoryRetriever) createMessagesRequest( ctx context.Context, - peerID peer.ID, + peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, @@ -228,7 +228,7 @@ func (hr *HistoryRetriever) createMessagesRequest( }) go func() { - storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes) + storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, processEnvelopes) resultCh <- struct { storeCursor []byte envelopesCount int @@ -244,7 +244,7 @@ func (hr *HistoryRetriever) createMessagesRequest( } } else { go func() { - _, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false) + _, _, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, false) if err != nil { logger.Error("failed to request store messages", zap.Error(err)) } @@ -254,9 +254,9 @@ func (hr *HistoryRetriever) createMessagesRequest( return } -func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) { +func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) { requestID := protocol.GenerateRequestID() - logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) + logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerInfo.ID)) logger.Debug("store.query", logging.Timep("startTime", criteria.TimeStart), @@ -278,12 +278,12 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee } queryStart := time.Now() - result, err := hr.store.Query(ctx, peerID, storeQueryRequest) + result, err := hr.store.Query(ctx, peerInfo, storeQueryRequest) queryDuration := time.Since(queryStart) if err != nil { logger.Error("error querying storenode", zap.Error(err)) - hr.historyProcessor.OnRequestFailed(requestID, peerID, err) + hr.historyProcessor.OnRequestFailed(requestID, peerInfo, err) return nil, 0, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go index 919b2fc91..19aa7b84d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go @@ -10,7 +10,7 @@ import ( ) type criteriaInterest struct { - peerID peer.ID + peerInfo peer.AddrInfo contentFilter protocol.ContentFilter lastChecked time.Time @@ -19,7 +19,7 @@ type criteriaInterest struct { } func (c criteriaInterest) equals(other criteriaInterest) bool { - if c.peerID != other.peerID { + if c.peerInfo.ID != other.peerInfo.ID { return false } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go index 382821735..a72af3c55 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go @@ -20,10 +20,10 @@ type defaultStorenodeRequestor struct { store *store.WakuStore } -func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) { - return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize)) +func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) { + return d.store.QueryByHash(ctx, messageHashes, store.WithPeerAddr(peerInfo.Addrs...), store.WithPaging(false, pageSize)) } -func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) { - return d.store.RequestRaw(ctx, peerID, storeQueryRequest) +func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) { + return d.store.RequestRaw(ctx, peerInfo, storeQueryRequest) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 72ac4f9f3..ab187af42 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -66,13 +66,13 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes } } -func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) { +func (m *MissingMessageVerifier) SetCriteriaInterest(peerInfo peer.AddrInfo, contentFilter protocol.ContentFilter) { m.criteriaInterestMu.Lock() defer m.criteriaInterestMu.Unlock() ctx, cancel := context.WithCancel(m.ctx) criteriaInterest := criteriaInterest{ - peerID: peerID, + peerInfo: peerInfo, contentFilter: contentFilter, lastChecked: m.timesource.Now().Add(-m.params.delay), ctx: ctx, @@ -164,7 +164,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter } m.logger.Error("could not fetch history", - zap.Stringer("peerID", interest.peerID), + zap.Stringer("peerID", interest.peerInfo.ID), zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), zap.Strings("contentTopics", contentTopics)) continue @@ -207,7 +207,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, contentTopics := interest.contentFilter.ContentTopics.ToList() logger := m.logger.With( - zap.Stringer("peerID", interest.peerID), + zap.Stringer("peerID", interest.peerInfo.ID), zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]), zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), logging.Epoch("from", interest.lastChecked), @@ -226,7 +226,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, return m.storenodeRequestor.Query( ctx, - interest.peerID, + interest.peerInfo, storeQueryRequest, ) }, logger, "retrieving history to check for missing messages") @@ -309,7 +309,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, PaginationLimit: proto.Uint64(maxMsgHashesPerRequest), } - return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest) + return m.storenodeRequestor.Query(queryCtx, interest.peerInfo, storeQueryRequest) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_verifier.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_verifier.go index 68eca0304..386728ece 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_verifier.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_verifier.go @@ -18,10 +18,10 @@ type defaultStorenodeMessageVerifier struct { store *store.WakuStore } -func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { +func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { var opts []store.RequestOption opts = append(opts, store.WithRequestID(requestID)) - opts = append(opts, store.WithPeer(peerID)) + opts = append(opts, store.WithPeerAddr(peerID.Addrs...)) opts = append(opts, store.WithPaging(false, pageSize)) opts = append(opts, store.IncludeData(false)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go index 8a37e20ce..c091e9592 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go @@ -33,7 +33,7 @@ type ISentCheck interface { type StorenodeMessageVerifier interface { // MessagesExist returns a list of the messages it found from a list of message hashes - MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) + MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) } // MessageSentCheck tracks the outgoing messages and check against store node @@ -211,8 +211,8 @@ func (m *MessageSentCheck) Start() { } func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash { - selectedPeer := m.storenodeCycle.GetActiveStorenode() - if selectedPeer == "" { + selectedPeer := m.storenodeCycle.GetActiveStorenodePeerInfo() + if selectedPeer.ID == "" { m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic)) return []common.Hash{} } @@ -224,13 +224,13 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c messageHashes[i] = pb.ToMessageHash(hash.Bytes()) } - m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes)) + m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Stringers("messageHashes", messageHashes)) queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout) defer cancel() result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes) if err != nil { - m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) + m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Error(err)) return []common.Hash{} } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 2de62cc7e..b8f895f84 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -699,8 +699,8 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro // AddPeer is used to add a peer and the protocols it support to the node peerstore // TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. -func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { - pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) +func (w *WakuNode) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { + pData, err := w.peermanager.AddPeer(addresses, origin, pubSubTopics, protocols...) if err != nil { return "", err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index c543cbe8e..6321471af 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -684,13 +684,19 @@ func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsu } // AddPeer adds peer to the peerStore and also to service slots -func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) { +func (pm *PeerManager) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) { //Assuming all addresses have peerId - info, err := peer.AddrInfoFromP2pAddr(address) + infoArr, err := peer.AddrInfosFromP2pAddrs(addresses...) if err != nil { return nil, err } + if len(infoArr) > 1 { + return nil, errors.New("only a single peerID is expected in AddPeer") + } + + info := infoArr[0] + //Add Service peers to serviceSlots. for _, proto := range protocols { pm.addPeerToServiceSlot(proto, info.ID) @@ -703,11 +709,8 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTo } pData := &service.PeerData{ - Origin: origin, - AddrInfo: peer.AddrInfo{ - ID: info.ID, - Addrs: info.Addrs, - }, + Origin: origin, + AddrInfo: info, PubsubTopics: pubsubTopics, } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 3b56d4700..29c292ebf 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/onlinechecker" @@ -325,7 +326,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, //Add Peer to peerstore. if params.pm != nil && params.peerAddr != nil { - pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1) + pData, err := wf.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1) if err != nil { return nil, nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go index 015cb352e..322936388 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" @@ -101,7 +102,7 @@ func (s *FilterTestSuite) TearDownTest() { func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) { mAddr := tests.GetAddr(h2.h) - _, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1) + _, err := h1.pm.AddPeer([]multiaddr.Multiaddr{mAddr}, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1) s.Log.Info("add peer", zap.Stringer("mAddr", mAddr)) s.Require().NoError(err) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go index ef971f003..61781e44c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go @@ -310,7 +310,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR //Add Peer to peerstore. if store.pm != nil && params.peerAddr != nil { - pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4) + pData, err := store.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, pubsubTopics, StoreID_v20beta4) if err != nil { return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 9d6744315..c973fe962 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" @@ -273,7 +274,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } if params.pm != nil && params.peerAddr != nil { - pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1) + pData, err := wakuLP.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1) if err != nil { return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go index ef1f7bb9a..94d702035 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -36,7 +37,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts } if params.pm != nil && params.peerAddr != nil { - pData, err := wakuPX.pm.AddPeer(params.peerAddr, peerstore.Static, []string{}, PeerExchangeID_v20alpha1) + pData, err := wakuPX.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{}, PeerExchangeID_v20alpha1) if err != nil { return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index febb863e5..29f0bf038 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -194,15 +194,15 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return result, nil } -func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) { +func (s *WakuStore) RequestRaw(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *pb.StoreQueryRequest) (Result, error) { err := storeRequest.Validate() if err != nil { return nil, err } var params Parameters - params.selectedPeer = peerID - if params.selectedPeer == "" { + params.peerAddr = peerInfo.Addrs + if len(params.peerAddr) == 0 { return nil, ErrMustSelectPeer } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go index facb3f54f..e6218cc7c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go @@ -11,7 +11,7 @@ import ( type Parameters struct { selectedPeer peer.ID - peerAddr multiaddr.Multiaddr + peerAddr []multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte @@ -33,7 +33,7 @@ type RequestOption func(*Parameters) error func WithPeer(p peer.ID) RequestOption { return func(params *Parameters) error { params.selectedPeer = p - if params.peerAddr != nil { + if len(params.peerAddr) != 0 { return errors.New("WithPeer and WithPeerAddr options are mutually exclusive") } return nil @@ -43,7 +43,7 @@ func WithPeer(p peer.ID) RequestOption { // WithPeerAddr is an option used to specify a peerAddress to request the message history. // This new peer will be added to peerStore. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { +func WithPeerAddr(pAddr ...multiaddr.Multiaddr) RequestOption { return func(params *Parameters) error { params.peerAddr = pAddr if params.selectedPeer != "" { diff --git a/wakuv2/gowaku.go b/wakuv2/gowaku.go index 5906f6957..cecc59f68 100644 --- a/wakuv2/gowaku.go +++ b/wakuv2/gowaku.go @@ -1869,7 +1869,7 @@ func (w *Waku) timestamp() int64 { } func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { - peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200) + peerID, err := w.node.AddPeer([]multiaddr.Multiaddr{address}, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200) if err != nil { return "", err } diff --git a/wakuv2/history_processor_wrapper.go b/wakuv2/history_processor_wrapper.go index 56c3f8cf4..fe4ed93f6 100644 --- a/wakuv2/history_processor_wrapper.go +++ b/wakuv2/history_processor_wrapper.go @@ -21,6 +21,6 @@ func (hr *HistoryProcessorWrapper) OnEnvelope(env *protocol.Envelope, processEnv return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes) } -func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerID peer.ID, err error) { - hr.waku.onHistoricMessagesRequestFailed(requestID, peerID, err) +func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) { + hr.waku.onHistoricMessagesRequestFailed(requestID, peerInfo, err) } diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index a0e310b0f..bf70e0101 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -318,7 +318,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "runtime" "strconv" "strings" @@ -327,7 +326,6 @@ import ( "time" "unsafe" - "github.com/golang/protobuf/proto" "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -347,7 +345,7 @@ import ( "github.com/libp2p/go-libp2p/core/metrics" - commonapi "github.com/waku-org/go-waku/waku/v2/api/common" + libp2pproto "github.com/libp2p/go-libp2p/core/protocol" filterapi "github.com/waku-org/go-waku/waku/v2/api/filter" "github.com/waku-org/go-waku/waku/v2/api/history" @@ -360,6 +358,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -434,7 +433,7 @@ type WakuConfig struct { // Waku represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku struct { - wakuCtx unsafe.Pointer + node *WakuNode appDB *sql.DB @@ -504,7 +503,7 @@ type Waku struct { // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery discV5BootstrapNodes []string - onHistoricMessagesRequestFailed func([]byte, peer.ID, error) + onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error) onPeerStats func(types.ConnStatus) statusTelemetryClient ITelemetryClient @@ -523,12 +522,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { } // New creates a WakuV2 client ready to communicate through the LibP2P network. -func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { - // Lock the main goroutine to its current OS thread - runtime.LockOSThread() - - WakuSetup() // This should only be called once in the whole app's life - +func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { node, err := wakuNew(nodeKey, fleet, cfg, @@ -539,20 +533,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon return nil, err } - defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() - if err != nil { - return nil, err - } - - if nwakuCfg.EnableRelay { - err = node.WakuRelaySubscribe(defaultPubsubTopic) - if err != nil { - return nil, err - } - } - - node.WakuSetEventCallback() - return node, nil // TODO-nwaku @@ -850,7 +830,6 @@ func (w *Waku) retryDnsDiscoveryWithBackoff(ctx context.Context, addr string, su } } } - */ func (w *Waku) discoverAndConnectPeers() { @@ -859,14 +838,14 @@ func (w *Waku) discoverAndConnectPeers() { if nameserver == "" { nameserver = "8.8.8.8" } - timeout := int(requestTimeout / time.Millisecond) for _, addrString := range w.cfg.WakuNodes { addrString := addrString if strings.HasPrefix(addrString, "enrtree://") { // Use DNS Discovery - res, err := w.WakuDnsDiscovery(addrString, nameserver, timeout) + ctx, _ := context.WithTimeout(w.ctx, requestTimeout) + res, err := w.node.DnsDiscovery(ctx, addrString, nameserver) if err != nil { w.logger.Error("could not obtain dns discovery peers for ClusterConfig.WakuNodes", zap.Error(err), zap.String("dnsDiscURL", addrString)) continue @@ -885,20 +864,31 @@ func (w *Waku) discoverAndConnectPeers() { addrsToConnect = append(addrsToConnect, addr) } } - // Now connect to all the Multiaddresses for _, ma := range addrsToConnect { - w.WakuConnect(ma.String(), timeout) + ctx, _ := context.WithTimeout(w.ctx, requestTimeout) + w.node.Connect(ctx, ma) } - } func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { defer gocommon.LogOnPanic() // Connection will be prunned eventually by the connection manager if needed // The peer connector in go-waku uses Connect, so it will execute identify as part of its + + // TODO-nwaku + // TODO: is enr and origin required? + // TODO: this function is meant to add a node to a peer store so it can be picked up by the peer manager + // so probably we shouldn't connect directly but expose an AddPeer function in libwaku + + ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) + defer cancel() + addr := peerInfo.Addrs[0] - w.WakuConnect(addr.String(), 1000) + err := w.node.Connect(ctx, addr) + if err != nil { + w.logger.Error("couldn't connect to peer", zap.Error(err), zap.Stringer("peerID", peerInfo.ID)) + } } /* TODO-nwaku @@ -993,17 +983,9 @@ func (w *Waku) GetPubsubTopic(topic string) string { return topic } -/* TODO-nwaku func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { topic = w.GetPubsubTopic(topic) - - if !w.node.Relay().IsSubscribed(topic) { - return nil - } - - contentFilter := protocol.NewContentFilter(topic) - - return w.node.Relay().Unsubscribe(w.ctx, contentFilter) + return w.node.RelayUnsubscribe(topic) } func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error { @@ -1013,6 +995,7 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P topic = w.GetPubsubTopic(topic) + /* TODO nwaku if w.node.Relay().IsSubscribed(topic) { return nil } @@ -1023,10 +1006,9 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P return err } } + */ - contentFilter := protocol.NewContentFilter(topic) - - sub, err := w.node.Relay().Subscribe(w.ctx, contentFilter) + err := w.node.RelaySubscribe(topic) if err != nil { return err } @@ -1038,23 +1020,23 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P for { select { case <-w.ctx.Done(): - err := w.node.Relay().Unsubscribe(w.ctx, contentFilter) + err := w.node.RelayUnsubscribe(topic) if err != nil && !errors.Is(err, context.Canceled) { w.logger.Error("could not unsubscribe", zap.Error(err)) } return - case env := <-sub[0].Ch: + // TODO-nwaku + /*case env := <-sub[0].Ch: err := w.OnNewEnvelopes(env, common.RelayedMessageType, false) if err != nil { w.logger.Error("OnNewEnvelopes error", zap.Error(err)) - } + }*/ } } }() return nil } -*/ // MaxMessageSize returns the maximum accepted message size. func (w *Waku) MaxMessageSize() uint32 { @@ -1393,7 +1375,7 @@ func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. func (w *Waku) Start() error { - err := w.WakuStart() + err := w.node.Start() if err != nil { return fmt.Errorf("failed to start nwaku node: %v", err) } @@ -1414,12 +1396,11 @@ func (w *Waku) Start() error { return fmt.Errorf("failed to start go-waku node: %v", err) } */ - w.StorenodeCycle = history.NewStorenodeCycle(w.logger, newPinger(w.wakuCtx)) - - w.HistoryRetriever = history.NewHistoryRetriever(newStorenodeRequestor(w.wakuCtx, w.logger), NewHistoryProcessorWrapper(w), w.logger) + w.StorenodeCycle = history.NewStorenodeCycle(w.logger, newPinger(w.node)) + w.HistoryRetriever = history.NewHistoryRetriever(newStorenodeRequestor(w.node, w.logger), NewHistoryProcessorWrapper(w), w.logger) w.StorenodeCycle.Start(w.ctx) - peerID, err := w.PeerID() + peerID, err := w.node.PeerID() if err != nil { return err } @@ -1490,7 +1471,7 @@ func (w *Waku) Start() error { if w.cfg.EnableMissingMessageVerification { w.missingMsgVerifier = missing.NewMissingMessageVerifier( - newStorenodeRequestor(w.wakuCtx, w.logger), + newStorenodeRequestor(w.node, w.logger), w, w.timesource, w.logger) @@ -1644,7 +1625,7 @@ func (w *Waku) startMessageSender() error { publishMethod = publish.LightPush }*/ - sender, err := publish.NewMessageSender(publishMethod, newPublisher(w.wakuCtx), w.logger) + sender, err := publish.NewMessageSender(publishMethod, newPublisher(w.node), w.logger) if err != nil { w.logger.Error("failed to create message sender", zap.Error(err)) return err @@ -1653,7 +1634,7 @@ func (w *Waku) startMessageSender() error { if w.cfg.EnableStoreConfirmationForMessagesSent { msgStoredChan := make(chan gethcommon.Hash, 1000) msgExpiredChan := make(chan gethcommon.Hash, 1000) - messageSentCheck := publish.NewMessageSentCheck(w.ctx, newStorenodeMessageVerifier(w.wakuCtx), w.StorenodeCycle, w.timesource, msgStoredChan, msgExpiredChan, w.logger) + messageSentCheck := publish.NewMessageSentCheck(w.ctx, newStorenodeMessageVerifier(w.node), w.StorenodeCycle, w.timesource, msgStoredChan, msgExpiredChan, w.logger) sender.WithMessageSentCheck(messageSentCheck) w.wg.Add(1) @@ -1706,13 +1687,12 @@ func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { return w.envelopeCache.Has(gethcommon.Hash(mh)), nil } -/* TODO-nwaku -func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { +func (w *Waku) SetTopicsToVerifyForMissingMessages(peerInfo peer.AddrInfo, pubsubTopic string, contentTopics []string) { if !w.cfg.EnableMissingMessageVerification { return } - w.missingMsgVerifier.SetCriteriaInterest(peerID, protocol.NewContentFilter(pubsubTopic, contentTopics...)) + w.missingMsgVerifier.SetCriteriaInterest(peerInfo, protocol.NewContentFilter(pubsubTopic, contentTopics...)) } func (w *Waku) setupRelaySubscriptions() error { @@ -1741,7 +1721,7 @@ func (w *Waku) setupRelaySubscriptions() error { } return nil -} */ +} // Stop implements node.Service, stopping the background data propagation thread // of the Waku protocol. @@ -1750,7 +1730,7 @@ func (w *Waku) Stop() error { w.envelopeCache.Stop() - err := w.WakuStop() + err := w.node.Stop() if err != nil { return err } @@ -1942,12 +1922,7 @@ func (w *Waku) ClearEnvelopesCache() { } func (w *Waku) PeerCount() (int, error) { - peerCount, err := w.GetNumConnectedPeers() - if err != nil { - return 0, err - } - - return peerCount, nil + return w.node.GetNumConnectedPeers() } // TODO-nwaku @@ -2204,23 +2179,19 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { // Using WakuConnect so it matches the go-waku's behavior and terminology - return w.WakuConnect(address.String(), int(requestTimeout/time.Millisecond)) + ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) + defer cancel() + return w.node.Connect(ctx, address) } -func (self *Waku) DropPeer(peerId peer.ID) error { - var resp = C.allocResp() - var cPeerId = C.CString(peerId.String()) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerId)) +func (w *Waku) DialPeerByID(peerID peer.ID) error { + ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) + defer cancel() + return w.node.DialPeerByID(ctx, peerID, relay.WakuRelayID_v200) +} - C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error DisconnectPeerById: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) +func (w *Waku) DropPeer(peerID peer.ID) error { + return w.node.DisconnectPeerByID(peerID) } func (w *Waku) ProcessingP2PMessages() bool { @@ -2246,22 +2217,7 @@ func (w *Waku) Clean() error { } func (w *Waku) PeerID() (peer.ID, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuGetMyPeerId(w.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - - peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - id, err := peer.Decode(peerIdStr) - if err != nil { - errMsg := "WakuGetMyPeerId - decoding peerId: %w" - return "", fmt.Errorf(errMsg, err) - } - return id, nil - } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", fmt.Errorf("WakuGetMyPeerId: %s", errMsg) + return w.node.PeerID() } // validatePrivateKey checks the format of the given private key. @@ -2334,10 +2290,6 @@ func (w *Waku) LegacyStoreNode() legacy_store.Store { return nil } -func WakuSetup() { - C.waku_setup() -} - func printStackTrace() { // Create a buffer to hold the stack trace buf := make([]byte, 102400) @@ -2354,7 +2306,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, - onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { + onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { var err error if logger == nil { @@ -2377,479 +2329,234 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, } logger.Info("starting wakuv2 with config", zap.Any("nwakuCfg", nwakuCfg), zap.Any("wakuCfg", cfg)) - jsonConfig, err := json.Marshal(nwakuCfg) + ctx, cancel := context.WithCancel(context.Background()) + + wakunode, err := newWakuNode(ctx, nwakuCfg) if err != nil { + cancel() return nil, err } - var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp() - - defer C.free(unsafe.Pointer(cJsonConfig)) - defer C.freeResp(resp) - - wakuCtx := C.cGoWakuNew(cJsonConfig, resp) // Notice that the events for self node are handled by the 'MyEventCallback' method - if C.getRet(resp) == C.RET_OK { - ctx, cancel := context.WithCancel(context.Background()) - return &Waku{ - wakuCtx: wakuCtx, - wakuCfg: nwakuCfg, - cfg: cfg, - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopeCache: newTTLCache(), - msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), - topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), - connectionNotifChan: make(chan node.PeerConnection, 20), - connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), - ctx: ctx, - cancel: cancel, - wg: sync.WaitGroup{}, - dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), - dnsAddressCacheLock: &sync.RWMutex{}, - dnsDiscAsyncRetrievedSignal: make(chan struct{}), - storeMsgIDs: make(map[gethcommon.Hash]bool), - timesource: ts, - storeMsgIDsMu: sync.RWMutex{}, - logger: logger, - discV5BootstrapNodes: nwakuCfg.Discv5BootstrapNodes, - onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, - onPeerStats: onPeerStats, - onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), - sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), - }, nil - } + return &Waku{ + node: wakunode, + wakuCfg: nwakuCfg, + cfg: cfg, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopeCache: newTTLCache(), + msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), + topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), + connectionNotifChan: make(chan node.PeerConnection, 20), + connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), + dnsAddressCacheLock: &sync.RWMutex{}, + dnsDiscAsyncRetrievedSignal: make(chan struct{}), + storeMsgIDs: make(map[gethcommon.Hash]bool), + timesource: ts, + storeMsgIDsMu: sync.RWMutex{}, + logger: logger, + discV5BootstrapNodes: nwakuCfg.Discv5BootstrapNodes, + onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, + onPeerStats: onPeerStats, + onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), + sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + }, nil - errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) -} - -func (self *Waku) WakuStart() error { - - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuStart(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) WakuStop() error { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuStop(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} -func (self *Waku) WakuDestroy() error { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuDestroy(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) StartDiscV5() error { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuStartDiscV5(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) StopDiscV5() error { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuStopDiscV5(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) WakuVersion() (string, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - - C.cGoWakuVersion(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return version, nil - } - - errMsg := "error WakuVersion: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) } //export globalEventCallback func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { // This is shared among all Golang instances - self := Waku{wakuCtx: userData} - self.MyEventCallback(callerRet, msg, len) + // TODO-nwaku + // self := Waku{wakuCtx: userData} + // self.MyEventCallback(callerRet, msg, len) } func (self *Waku) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) { fmt.Println("Event received:", C.GoStringN(msg, C.int(len))) } -func (self *Waku) WakuSetEventCallback() { - // Notice that the events for self node are handled by the 'MyEventCallback' method - C.cGoWakuSetEventCallback(self.wakuCtx) +type response struct { + err error + value any } -func (self *Waku) FormatContentTopic( - appName string, - appVersion int, - contentTopicName string, - encoding string) (WakuContentTopic, error) { - - var cAppName = C.CString(appName) - var cContentTopicName = C.CString(contentTopicName) - var cEncoding = C.CString(encoding) - var resp = C.allocResp() - - defer C.free(unsafe.Pointer(cAppName)) - defer C.free(unsafe.Pointer(cContentTopicName)) - defer C.free(unsafe.Pointer(cEncoding)) - defer C.freeResp(resp) - - C.cGoWakuContentTopic(self.wakuCtx, - cAppName, - C.int(appVersion), - cContentTopicName, - cEncoding, - resp) - - if C.getRet(resp) == C.RET_OK { - var contentTopic = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return contentTopic, nil - } - - errMsg := "error FormatContentTopic: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - - return "", errors.New(errMsg) +// WakuNode represents an instance of an nwaku node +type WakuNode struct { + wakuCtx unsafe.Pointer + cancel context.CancelFunc + requestCh chan *request } -func (self *Waku) FormatPubsubTopic(topicName string) (WakuPubsubTopic, error) { - var cTopicName = C.CString(topicName) - var resp = C.allocResp() +type requestType int - defer C.free(unsafe.Pointer(cTopicName)) - defer C.freeResp(resp) +const ( + requestTypeNew requestType = iota + 1 + requestTypePing + requestTypeStart + requestTypeRelayPublish + requestTypeStoreQuery + requestTypePeerID + requestTypeStop + requestTypeDestroy + requestTypeStartDiscV5 + requestTypeStopDiscV5 + requestTypeVersion + requestTypeRelaySubscribe + requestTypeRelayUnsubscribe + requestTypePeerExchangeRequest + requestTypeConnect + requestTypeDialPeerByID + requestTypeListenAddresses + requestTypeENR + requestTypeListPeersInMesh + requestTypeGetConnectedPeers + requestTypeGetPeerIDsFromPeerStore + requestTypeGetPeerIDsByProtocol + requestTypeDisconnectPeerByID + requestTypeDnsDiscovery + requestTypeDialPeer + requestTypeGetNumConnectedRelayPeers +) - C.cGoWakuPubsubTopic(self.wakuCtx, cTopicName, resp) - if C.getRet(resp) == C.RET_OK { - var pubsubTopic = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return pubsubTopic, nil - } - - errMsg := "error FormatPubsubTopic: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - - return "", errors.New(errMsg) +type request struct { + id string + reqType requestType + input any + responseCh chan response } -func (self *Waku) WakuDefaultPubsubTopic() (WakuPubsubTopic, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuDefaultPubsubTopic(self.wakuCtx, resp) - if C.getRet(resp) == C.RET_OK { - var defaultPubsubTopic = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return defaultPubsubTopic, nil +func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { + ctx, cancel := context.WithCancel(ctx) + + n := &WakuNode{ + requestCh: make(chan *request), + cancel: cancel, } - errMsg := "error WakuDefaultPubsubTopic: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + // Notice this runs insto a separate goroutine. This is because we can't be sure + // from which OS thread will go call nwaku operations (They need to be done from + // the same thread that started nwaku). Communication with the goroutine to send + // operations to nwaku will be done via channels + go func() { + defer gocommon.LogOnPanic() - return "", errors.New(errMsg) -} + runtime.LockOSThread() + defer runtime.UnlockOSThread() -func (self *Waku) WakuRelaySubscribe(pubsubTopic string) error { - var resp = C.allocResp() - var cPubsubTopic = C.CString(pubsubTopic) + C.waku_setup() - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) + n.processLoop(ctx) + }() - if self.wakuCtx == nil { - return errors.New("wakuCtx is nil") - } - // if self.cPubsubTopic == nil { - // fmt.Println("cPubsubTopic is nil") - // } - // if self.resp == nil { - // fmt.Println("resp is nil") - // } - - C.cGoWakuRelaySubscribe(self.wakuCtx, cPubsubTopic, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuRelaySubscribe: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) WakuRelayUnsubscribe(pubsubTopic string) error { - var resp = C.allocResp() - var cPubsubTopic = C.CString(pubsubTopic) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - C.cGoWakuRelayUnsubscribe(self.wakuCtx, cPubsubTopic, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuRelayUnsubscribe: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) WakuLightpushPublish(message *pb.WakuMessage, pubsubTopic string) (string, error) { - jsonMsg, err := json.Marshal(message) + _, err := n.postTask(requestTypeNew, config) if err != nil { - return "", err + cancel() + return nil, err } - - var cPubsubTopic = C.CString(pubsubTopic) - var msg = C.CString(string(jsonMsg)) - var resp = C.allocResp() - - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - defer C.free(unsafe.Pointer(msg)) - - C.cGoWakuLightpushPublish(self.wakuCtx, cPubsubTopic, msg, resp) - if C.getRet(resp) == C.RET_OK { - msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return msg, nil - } - errMsg := "error WakuLightpushPublish: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) + return n, nil } -func wakuStoreQuery( - wakuCtx unsafe.Pointer, - jsonQuery string, - peerAddr string, - timeoutMs int) (string, error) { - - var cJsonQuery = C.CString(jsonQuery) - var cPeerAddr = C.CString(peerAddr) - var resp = C.allocResp() - - defer C.free(unsafe.Pointer(cJsonQuery)) - defer C.free(unsafe.Pointer(cPeerAddr)) - defer C.freeResp(resp) - - C.cGoWakuStoreQuery(wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) - if C.getRet(resp) == C.RET_OK { - msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return msg, nil +func (n *WakuNode) postTask(reqType requestType, input any) (any, error) { + responseCh := make(chan response) + n.requestCh <- &request{ + reqType: reqType, + input: input, + responseCh: responseCh, } - errMsg := "error WakuStoreQuery: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) + response := <-responseCh + if response.err != nil { + return nil, response.err + } + return response.value, nil } -func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (uint64, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - - C.cGoWakuPeerExchangeQuery(self.wakuCtx, C.uint64_t(numPeers), resp) - if C.getRet(resp) == C.RET_OK { - numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) - if err != nil { - return 0, err +func (n *WakuNode) processLoop(ctx context.Context) { + for req := range n.requestCh { + switch req.reqType { + case requestTypeNew: + req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))} + case requestTypePing: + duration, err := n.pingPeer(req.input.(pingRequest)) + req.responseCh <- response{value: duration, err: err} + case requestTypeStart: + req.responseCh <- response{err: n.start()} + case requestTypeRelayPublish: + hash, err := n.relayPublish(req.input.(relayPublishRequest)) + req.responseCh <- response{value: hash, err: err} + case requestTypeStoreQuery: + results, err := n.storeQuery(req.input.(storeQueryRequest)) + req.responseCh <- response{value: results, err: err} + case requestTypeDestroy: + req.responseCh <- response{err: n.destroy()} + case requestTypePeerID: + peerID, err := n.peerID() + req.responseCh <- response{value: peerID, err: err} + case requestTypeStop: + req.responseCh <- response{err: n.stop()} + case requestTypeStartDiscV5: + req.responseCh <- response{err: n.startDiscV5()} + case requestTypeStopDiscV5: + req.responseCh <- response{err: n.stopDiscV5()} + case requestTypeVersion: + version, err := n.version() + req.responseCh <- response{value: version, err: err} + case requestTypePeerExchangeRequest: + numPeers, err := n.peerExchangeRequest(req.input.(uint64)) + req.responseCh <- response{value: numPeers, err: err} + case requestTypeRelaySubscribe: + req.responseCh <- response{err: n.relaySubscribe(req.input.(string))} + case requestTypeRelayUnsubscribe: + req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))} + case requestTypeConnect: + req.responseCh <- response{err: n.connect(req.input.(connectRequest))} + case requestTypeDialPeerByID: + req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))} + case requestTypeListenAddresses: + addrs, err := n.listenAddresses() + req.responseCh <- response{value: addrs, err: err} + case requestTypeENR: + enr, err := n.enr() + req.responseCh <- response{value: enr, err: err} + case requestTypeListPeersInMesh: + numPeers, err := n.listPeersInMesh(req.input.(string)) + req.responseCh <- response{value: numPeers, err: err} + case requestTypeGetConnectedPeers: + peers, err := n.getConnectedPeers() + req.responseCh <- response{value: peers, err: err} + case requestTypeGetPeerIDsFromPeerStore: + peers, err := n.getPeerIDsFromPeerStore() + req.responseCh <- response{value: peers, err: err} + case requestTypeGetPeerIDsByProtocol: + peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID)) + req.responseCh <- response{value: peers, err: err} + case requestTypeDisconnectPeerByID: + req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))} + case requestTypeDnsDiscovery: + addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest)) + req.responseCh <- response{value: addrs, err: err} + case requestTypeDialPeer: + req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))} + case requestTypeGetNumConnectedRelayPeers: + numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...) + req.responseCh <- response{value: numPeers, err: err} + default: + req.responseCh <- response{err: errors.New("invalid operation")} } - return numRecvPeers, nil } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return 0, fmt.Errorf("WakuPeerExchangeRequest: %s", errMsg) } -func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(peerMultiAddr) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerMultiAddr)) - - C.cGoWakuConnect(self.wakuCtx, cPeerMultiAddr, C.int(timeoutMs), resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error WakuConnect: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) WakuDialPeer(peerMultiAddr multiaddr.Multiaddr, protocol string, timeoutMs int) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(peerMultiAddr.String()) - var cProtocol = C.CString(protocol) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerMultiAddr)) - defer C.free(unsafe.Pointer(cProtocol)) - - C.cGoWakuDialPeer(self.wakuCtx, cPeerMultiAddr, cProtocol, C.int(timeoutMs), resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error DialPeer: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs int) error { - var resp = C.allocResp() - var cPeerId = C.CString(peerId.String()) - var cProtocol = C.CString(protocol) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerId)) - defer C.free(unsafe.Pointer(cProtocol)) - - C.cGoWakuDialPeerById(self.wakuCtx, cPeerId, cProtocol, C.int(timeoutMs), resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error DialPeerById: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (self *Waku) WakuDnsDiscovery(entTreeUrl string, nameDnsServer string, timeoutMs int) ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - var cEnrTree = C.CString(entTreeUrl) - var cDnsServer = C.CString(nameDnsServer) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cEnrTree)) - defer C.free(unsafe.Pointer(cDnsServer)) - - C.cGoWakuDnsDiscovery(self.wakuCtx, cEnrTree, cDnsServer, C.int(timeoutMs), resp) - - if C.getRet(resp) == C.RET_OK { - - var addrsRet []multiaddr.Multiaddr - nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - addrss := strings.Split(nodeAddresses, ",") - for _, addr := range addrss { - addr, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - - addrsRet = append(addrsRet, addr) - } - - return addrsRet, nil - } - errMsg := "error WakuDnsDiscovery: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - - return nil, errors.New(errMsg) -} - -func (self *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuListenAddresses(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - - var addrsRet []multiaddr.Multiaddr - listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - addrss := strings.Split(listenAddresses, ",") - for _, addr := range addrss { - addr, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - - addrsRet = append(addrsRet, addr) - } - - return addrsRet, nil - } - errMsg := "error WakuListenAddresses: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - - return nil, errors.New(errMsg) -} - -func (self *Waku) ENR() (*enode.Node, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuGetMyENR(self.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - n, err := enode.Parse(enode.ValidSchemes, enrStr) - if err != nil { - return nil, err - } - return n, nil - } - errMsg := "error WakuGetMyENR: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) -} - -func (self *Waku) ListPeersInMesh(pubsubTopic string) (int, error) { - var resp = C.allocResp() - var cPubsubTopic = C.CString(pubsubTopic) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - - C.cGoWakuListPeersInMesh(self.wakuCtx, cPubsubTopic, resp) - - if C.getRet(resp) == C.RET_OK { - numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - numPeers, err := strconv.Atoi(numPeersStr) - if err != nil { - errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() - return 0, errors.New(errMsg) - } - return numPeers, nil - } - errMsg := "error ListPeersInMesh: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return 0, errors.New(errMsg) -} - -func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { +func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string - if len(paramPubsubTopic) == 0 { + if len(optPubsubTopic) == 0 { pubsubTopic = "" } else { - pubsubTopic = paramPubsubTopic[0] + pubsubTopic = optPubsubTopic[0] } var resp = C.allocResp() @@ -2857,7 +2564,7 @@ func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, er defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) - C.cGoWakuGetNumConnectedRelayPeers(self.wakuCtx, cPubsubTopic, resp) + C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) if C.getRet(resp) == C.RET_OK { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2873,19 +2580,32 @@ func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, er return 0, errors.New(errMsg) } -func (self *Waku) GetConnectedPeers() (peer.IDSlice, error) { +func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { + var resp = C.allocResp() + var cPeerId = C.CString(peerID.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + + C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DisconnectPeerById: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { var resp = C.allocResp() defer C.freeResp(resp) - C.cGoWakuGetConnectedPeers(self.wakuCtx, resp) - + C.cGoWakuGetConnectedPeers(n.wakuCtx, resp) if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { - return peer.IDSlice{}, nil + return nil, nil } // peersStr contains a comma-separated list of peer ids itemsPeerIds := strings.Split(peersStr, ",") - var peers peer.IDSlice for _, peerId := range itemsPeerIds { id, err := peer.Decode(peerId) @@ -2894,204 +2614,168 @@ func (self *Waku) GetConnectedPeers() (peer.IDSlice, error) { } peers = append(peers, id) } - return peers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg) + } -func (self *Waku) GetNumConnectedPeers() (int, error) { - connecterPeers, err := self.GetConnectedPeers() - if err != nil { - return 0, err +func (n *WakuNode) relaySubscribe(pubsubTopic string) error { + if pubsubTopic == "" { + return errors.New("pubsub topic is empty") } - return len(connecterPeers), nil -} -func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + defer C.freeResp(resp) - C.cGoWakuGetPeerIdsFromPeerStore(self.wakuCtx, resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + if n.wakuCtx == nil { + return errors.New("wakuCtx is nil") + } + + C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) if C.getRet(resp) == C.RET_OK { - peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if peersStr == "" { - return peer.IDSlice{}, nil - } - // peersStr contains a comma-separated list of peer ids - itemsPeerIds := strings.Split(peersStr, ",") - - var peers peer.IDSlice - for _, peerId := range itemsPeerIds { - id, err := peer.Decode(peerId) - if err != nil { - return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err) - } - peers = append(peers, id) - } - - return peers, nil + return nil } + + errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { + if pubsubTopic == "" { + return errors.New("pubsub topic is empty") + } + + var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + if n.wakuCtx == nil { + return errors.New("wakuCtx is nil") + } + + C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp) + if C.getRet(resp) == C.RET_OK { + numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) + if err != nil { + return 0, err + } + return numRecvPeers, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) + return 0, errors.New(errMsg) } -func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { +func (n *WakuNode) startDiscV5() error { var resp = C.allocResp() - var cProtocol = C.CString(protocol) defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cProtocol)) - - C.cGoWakuGetPeerIdsByProtocol(self.wakuCtx, cProtocol, resp) + C.cGoWakuStartDiscV5(n.wakuCtx, resp) if C.getRet(resp) == C.RET_OK { - peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if peersStr == "" { - return peer.IDSlice{}, nil - } - // peersStr contains a comma-separated list of peer ids - itemsPeerIds := strings.Split(peersStr, ",") - - var peers peer.IDSlice - for _, p := range itemsPeerIds { - id, err := peer.Decode(p) - if err != nil { - return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err) - } - peers = append(peers, id) - } - - return peers, nil + return nil } - errMsg := "error GetPeerIdsByProtocol: " + + errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) stopDiscV5() error { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuStopDiscV5(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) version() (string, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuVersion(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return version, nil + } + + errMsg := "error WakuVersion: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return "", errors.New(errMsg) +} + +func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) { + // TODO: extract timeout from context + timeoutMs := time.Minute.Milliseconds() + + b, err := json.Marshal(storeQueryRequest.storeRequest) + if err != nil { + return nil, err + } + + addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs)) + for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) { + addrs[i] = addr.String() + } + + var cJsonQuery = C.CString(string(b)) + var cPeerAddr = C.CString(strings.Join(addrs, ",")) + var resp = C.allocResp() + + defer C.free(unsafe.Pointer(cJsonQuery)) + defer C.free(unsafe.Pointer(cPeerAddr)) + defer C.freeResp(resp) + + C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) + if C.getRet(resp) == C.RET_OK { + jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + storeQueryResponse := &storepb.StoreQueryResponse{} + err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse) + if err != nil { + return nil, err + } + return storeQueryResponse, nil + } + errMsg := "error WakuStoreQuery: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return nil, errors.New(errMsg) } -// func main() { +func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) { + // TODO: extract timeout from context + timeoutMs := time.Minute.Milliseconds() -// config := WakuConfig{ -// Host: "0.0.0.0", -// Port: 30304, -// NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", -// EnableRelay: true, -// LogLevel: "DEBUG", -// } - -// node, err := wakuNew(config) -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// node.WakuSetEventCallback() - -// defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// err = node.WakuRelaySubscribe(defaultPubsubTopic) -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// err = node.WakuConnect( -// // tries to connect to a localhost node with key: 0d714a1fada214dead6dc9c7274585eca0ff292451866e7d6d677dc818e8ccd2 -// "/ip4/0.0.0.0/tcp/60000/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN", -// 10000) -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// err = node.WakuStart() -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// version, err := node.WakuVersion() -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// formattedContentTopic, err := node.FormatContentTopic("appName", 1, "cTopicName", "enc") -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// formattedPubsubTopic, err := node.FormatPubsubTopic("my-ctopic") -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// listenAddresses, err := node.WakuListenAddresses() -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// fmt.Println("Version:", version) -// fmt.Println("Custom content topic:", formattedContentTopic) -// fmt.Println("Custom pubsub topic:", formattedPubsubTopic) -// fmt.Println("Default pubsub topic:", defaultPubsubTopic) -// fmt.Println("Listen addresses:", listenAddresses) - -// // Wait for a SIGINT or SIGTERM signal -// ch := make(chan os.Signal, 1) -// signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) -// <-ch - -// err = node.WakuStop() -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } - -// err = node.WakuDestroy() -// if err != nil { -// fmt.Println("Error happened:", err.Error()) -// return -// } -// } - -// MaxMessageSize returns the maximum accepted message size. -/* TODO-nwaku -func (w *Waku) MaxMessageSize() uint32 { - return w.cfg.MaxMessageSize -} */ - -func newPublisher(wakuCtx unsafe.Pointer) publish.Publisher { - return &nwakuPublisher{ - wakuCtx: wakuCtx, - } -} - -type nwakuPublisher struct { - wakuCtx unsafe.Pointer -} - -func (p *nwakuPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) { - // TODO-nwaku - return nil, nil -} - -func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - timeoutMs := 1000 - - jsonMsg, err := json.Marshal(message) + jsonMsg, err := json.Marshal(relayPublishRequest.message) if err != nil { return pb.MessageHash{}, err } - var cPubsubTopic = C.CString(pubsubTopic) + var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic) var msg = C.CString(string(jsonMsg)) var resp = C.allocResp() @@ -3099,7 +2783,7 @@ func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessa defer C.free(unsafe.Pointer(cPubsubTopic)) defer C.free(unsafe.Pointer(msg)) - C.cGoWakuRelayPublish(p.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) + C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) if C.getRet(resp) == C.RET_OK { msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) msgHashBytes, err := hexutil.Decode(msgHash) @@ -3108,82 +2792,39 @@ func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessa } return pb.ToMessageHash(msgHashBytes), nil } - errMsg := "error WakuRelayPublish: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return pb.MessageHash{}, errors.New(errMsg) } -// LightpushPublish publishes a message via WakuLightPush -func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) { - // TODO-nwaku - return pb.MessageHash{}, errors.New("not implemented yet") +func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) { + var resp = C.allocResp() + var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl) + var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cEnrTree)) + defer C.free(unsafe.Pointer(cDnsServer)) + // TODO: extract timeout from context + C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp) + if C.getRet(resp) == C.RET_OK { + var addrsRet []multiaddr.Multiaddr + nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + addrss := strings.Split(nodeAddresses, ",") + for _, addr := range addrss { + addr, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrsRet = append(addrsRet, addr) + } + return addrsRet, nil + } + errMsg := "error WakuDnsDiscovery: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) } -func newStorenodeMessageVerifier(wakuCtx unsafe.Pointer) publish.StorenodeMessageVerifier { - return &storenodeMessageVerifier{ - wakuCtx: wakuCtx, - } -} +func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { + peerInfo := request.peerInfo -type storenodeMessageVerifier struct { - wakuCtx unsafe.Pointer -} - -func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { - requestIDStr := hex.EncodeToString(requestID) - storeRequest := &storepb.StoreQueryRequest{ - RequestId: requestIDStr, - MessageHashes: make([][]byte, len(messageHashes)), - IncludeData: false, - PaginationCursor: nil, - PaginationForward: false, - PaginationLimit: proto.Uint64(pageSize), - } - - for i, mhash := range messageHashes { - storeRequest.MessageHashes[i] = mhash.Bytes() - } - - jsonQuery, err := json.Marshal(storeRequest) - if err != nil { - return nil, err - } - - // TODO: timeouts need to be managed differently. For now we're using a 1m timeout - jsonResponse, err := wakuStoreQuery(d.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds())) - if err != nil { - return nil, err - } - - response := &storepb.StoreQueryResponse{} - err = json.Unmarshal([]byte(jsonResponse), response) - if err != nil { - return nil, err - } - - if response.GetStatusCode() != http.StatusOK { - return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, response.GetStatusCode(), response.GetStatusDesc()) - } - - result := make([]pb.MessageHash, len(response.Messages)) - for i, msg := range response.Messages { - result[i] = pb.ToMessageHash(msg.GetMessageHash()) - } - - return result, nil -} - -type pinger struct { - wakuCtx unsafe.Pointer -} - -func newPinger(wakuCtx unsafe.Pointer) commonapi.Pinger { - return &pinger{ - wakuCtx: wakuCtx, - } -} - -func (p *pinger) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { addrs := make([]string, len(peerInfo.Addrs)) for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() @@ -3194,7 +2835,8 @@ func (p *pinger) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Dur defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) - C.cGoWakuPingPeer(p.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + // TODO: extract timeout from ctx + C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) if C.getRet(resp) == C.RET_OK { rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) rttInt, err := strconv.ParseInt(rttStr, 10, 64) @@ -3208,156 +2850,502 @@ func (p *pinger) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Dur return 0, fmt.Errorf("PingPeer: %s", errMsg) } -type storenodeRequestor struct { - wakuCtx unsafe.Pointer - logger *zap.Logger -} +func (n *WakuNode) start() error { + var resp = C.allocResp() + defer C.freeResp(resp) -func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) commonapi.StorenodeRequestor { - return &storenodeRequestor{ - wakuCtx: wakuCtx, - logger: logger.Named("storenodeRequestor"), - } -} + C.cGoWakuStart(n.wakuCtx, resp) -func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) { - requestIDStr := hex.EncodeToString(protocol.GenerateRequestID()) - - logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr)) - - logger.Debug("sending store request") - - storeRequest := &storepb.StoreQueryRequest{ - RequestId: requestIDStr, - MessageHashes: make([][]byte, len(messageHashes)), - IncludeData: true, - PaginationCursor: nil, - PaginationForward: false, - PaginationLimit: proto.Uint64(pageSize), - } - - for i, mhash := range messageHashes { - storeRequest.MessageHashes[i] = mhash.Bytes() - } - - jsonQuery, err := json.Marshal(storeRequest) - if err != nil { - return nil, err - } - - // TODO: timeouts need to be managed differently. For now we're using a 1m timeout - jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds())) - if err != nil { - return nil, err - } - - storeResponse := &storepb.StoreQueryResponse{} - err = json.Unmarshal([]byte(jsonResponse), storeResponse) - if err != nil { - return nil, err - } - - if storeResponse.GetStatusCode() != http.StatusOK { - return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) - } - - return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil -} - -func (s *storenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeRequest *storepb.StoreQueryRequest) (commonapi.StoreRequestResult, error) { - jsonQuery, err := json.Marshal(storeRequest) - if err != nil { - return nil, err - } - - // TODO: timeouts need to be managed differently. For now we're using a 1m timeout - jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds())) - if err != nil { - return nil, err - } - - storeResponse := &storepb.StoreQueryResponse{} - err = json.Unmarshal([]byte(jsonResponse), storeResponse) - if err != nil { - return nil, err - } - - if storeResponse.GetStatusCode() != http.StatusOK { - return nil, fmt.Errorf("could not query storenode: %s %d %s", storeRequest.RequestId, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) - } - - return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil -} - -type storeResultImpl struct { - done bool - - wakuCtx unsafe.Pointer - storeRequest *storepb.StoreQueryRequest - storeResponse *storepb.StoreQueryResponse - peerID peer.ID -} - -func newStoreResultImpl(wakuCtx unsafe.Pointer, peerID peer.ID, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl { - return &storeResultImpl{ - wakuCtx: wakuCtx, - storeRequest: storeRequest, - storeResponse: storeResponse, - peerID: peerID, - } -} - -func (r *storeResultImpl) Cursor() []byte { - return r.storeResponse.GetPaginationCursor() -} - -func (r *storeResultImpl) IsComplete() bool { - return r.done -} - -func (r *storeResultImpl) PeerID() peer.ID { - return r.peerID -} - -func (r *storeResultImpl) Query() *storepb.StoreQueryRequest { - return r.storeRequest -} - -func (r *storeResultImpl) Response() *storepb.StoreQueryResponse { - return r.storeResponse -} - -func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error { - // TODO: opts is being ignored. Will require some changes in go-waku. For now using this - // is not necessary - - if r.storeResponse.GetPaginationCursor() == nil { - r.done = true + if C.getRet(resp) == C.RET_OK { return nil } - r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) - r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor + errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} - jsonQuery, err := json.Marshal(r.storeRequest) +func (n *WakuNode) stop() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuStop(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) destroy() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuDestroy(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) peerID() (peer.ID, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuGetMyPeerId(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + id, err := peer.Decode(peerIdStr) + if err != nil { + errMsg := "WakuGetMyPeerId - decoding peerId: %w" + return "", fmt.Errorf(errMsg, err) + } + return id, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return "", errors.New(errMsg) +} + +func (n *WakuNode) connect(connReq connectRequest) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(connReq.addr.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + + // TODO: extract timeout from ctx + C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error WakuConnect: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { + var resp = C.allocResp() + var cPeerId = C.CString(dialPeerByIDReq.peerID.String()) + var cProtocol = C.CString(string(dialPeerByIDReq.protocol)) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + defer C.free(unsafe.Pointer(cProtocol)) + + // TODO: extract timeout from ctx + C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuListenAddresses(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + var addrsRet []multiaddr.Multiaddr + listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + addrss := strings.Split(listenAddresses, ",") + for _, addr := range addrss { + addr, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrsRet = append(addrsRet, addr) + } + return addrsRet, nil + } + errMsg := "error WakuListenAddresses: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + +func (n *WakuNode) enr() (*enode.Node, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetMyENR(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + n, err := enode.Parse(enode.ValidSchemes, enrStr) + if err != nil { + return nil, err + } + return n, nil + } + errMsg := "error WakuGetMyENR: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + +func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { + var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + C.cGoWakuListPeersInMesh(n.wakuCtx, cPubsubTopic, resp) + + if C.getRet(resp) == C.RET_OK { + numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numPeers, err := strconv.Atoi(numPeersStr) + if err != nil { + errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + return 0, errors.New(errMsg) + } + return numPeers, nil + } + errMsg := "error ListPeersInMesh: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, errors.New(errMsg) +} + +func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return nil, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err) + } + peers = append(peers, id) + } + return peers, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) +} + +func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) { + var resp = C.allocResp() + var cProtocol = C.CString(string(protocolID)) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cProtocol)) + + C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return nil, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, p := range itemsPeerIds { + id, err := peer.Decode(p) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err) + } + peers = append(peers, id) + } + return peers, nil + } + errMsg := "error GetPeerIdsByProtocol: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg) +} + +func (n *WakuNode) newNode(config *WakuConfig) error { + jsonConfig, err := json.Marshal(config) if err != nil { return err } - // TODO: timeouts need to be managed differently. For now we're using a 1m timeout - jsonResponse, err := wakuStoreQuery(r.wakuCtx, string(jsonQuery), r.peerID.String(), int(time.Minute.Milliseconds())) - if err != nil { - return err + var cJsonConfig = C.CString(string(jsonConfig)) + var resp = C.allocResp() + + defer C.free(unsafe.Pointer(cJsonConfig)) + defer C.freeResp(resp) + + if C.getRet(resp) != C.RET_OK { + errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) } - err = json.Unmarshal([]byte(jsonResponse), r.storeResponse) - if err != nil { - return err - } + wakuCtx := C.cGoWakuNew(cJsonConfig, resp) + n.wakuCtx = unsafe.Pointer(wakuCtx) + + // Notice that the events for self node are handled by the 'MyEventCallback' method + C.cGoWakuSetEventCallback(n.wakuCtx) return nil } -func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue { - return r.storeResponse.GetMessages() +func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) + var cProtocol = C.CString(string(dialPeerReq.protocol)) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + defer C.free(unsafe.Pointer(cProtocol)) + // TODO: extract timeout from context + C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeer: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +type pingRequest struct { + ctx context.Context + peerInfo peer.AddrInfo +} + +func (n *WakuNode) PingPeer(ctx context.Context, info peer.AddrInfo) (time.Duration, error) { + response, err := n.postTask(requestTypePing, pingRequest{ + ctx: ctx, + peerInfo: info, + }) + if err != nil { + return 0, err + } + return response.(time.Duration), nil +} + +func (n *WakuNode) Start() error { + _, err := n.postTask(requestTypeStart, nil) + return err +} + +type relayPublishRequest struct { + ctx context.Context + pubsubTopic string + message *pb.WakuMessage +} + +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ + ctx: ctx, + pubsubTopic: pubsubTopic, + message: message, + }) + if err != nil { + return pb.MessageHash{}, err + } + return response.(pb.MessageHash), nil +} + +type storeQueryRequest struct { + ctx context.Context + storeRequest *storepb.StoreQueryRequest + peerInfo peer.AddrInfo +} + +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { + response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ + ctx: ctx, + peerInfo: peerInfo, + storeRequest: storeRequest, + }) + if err != nil { + return nil, err + } + return response.(*storepb.StoreQueryResponse), nil +} + +func (n *WakuNode) PeerID() (peer.ID, error) { + response, err := n.postTask(requestTypePeerID, nil) + if err != nil { + return "", err + } + return response.(peer.ID), nil +} + +func (n *WakuNode) Stop() error { + _, err := n.postTask(requestTypeStop, nil) + return err +} + +func (n *WakuNode) Destroy() error { + _, err := n.postTask(requestTypeDestroy, nil) + return err +} + +func (n *WakuNode) StartDiscV5() error { + _, err := n.postTask(requestTypeStartDiscV5, nil) + return err +} + +func (n *WakuNode) StopDiscV5() error { + _, err := n.postTask(requestTypeStopDiscV5, nil) + return err +} + +func (n *WakuNode) Version() (string, error) { + response, err := n.postTask(requestTypeVersion, nil) + if err != nil { + return "", err + } + return response.(string), nil +} + +func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { + _, err := n.postTask(requestTypeRelaySubscribe, pubsubTopic) + return err +} + +func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { + _, err := n.postTask(requestTypeRelayUnsubscribe, pubsubTopic) + return err +} + +func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { + response, err := n.postTask(requestTypePeerExchangeRequest, numPeers) + if err != nil { + return 0, err + } + return response.(uint64), nil +} + +type connectRequest struct { + ctx context.Context + addr multiaddr.Multiaddr +} + +func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { + _, err := n.postTask(requestTypeConnect, connectRequest{ + ctx: ctx, + addr: addr, + }) + return err +} + +type dialPeerByIDRequest struct { + ctx context.Context + peerID peer.ID + protocol libp2pproto.ID +} + +func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { + _, err := n.postTask(requestTypeDialPeerByID, dialPeerByIDRequest{ + ctx: ctx, + peerID: peerID, + protocol: protocol, + }) + return err +} + +func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { + response, err := n.postTask(requestTypeListenAddresses, nil) + if err != nil { + return nil, err + } + return response.([]multiaddr.Multiaddr), nil +} + +func (n *WakuNode) ENR() (*enode.Node, error) { + response, err := n.postTask(requestTypeENR, nil) + if err != nil { + return nil, err + } + return response.(*enode.Node), nil +} + +func (n *WakuNode) ListPeersInMesh(pubsubTopic string) (int, error) { + response, err := n.postTask(requestTypeListPeersInMesh, pubsubTopic) + if err != nil { + return 0, err + } + return response.(int), nil +} + +func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { + response, err := n.postTask(requestTypeGetConnectedPeers, nil) + if err != nil { + return nil, err + } + return response.(peer.IDSlice), nil +} + +func (n *WakuNode) GetNumConnectedPeers() (int, error) { + peers, err := n.GetConnectedPeers() + if err != nil { + return 0, err + } + return len(peers), nil +} + +func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { + response, err := n.postTask(requestTypeGetPeerIDsFromPeerStore, nil) + if err != nil { + return nil, err + } + return response.(peer.IDSlice), nil +} + +func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { + response, err := n.postTask(requestTypeGetPeerIDsByProtocol, protocol) + if err != nil { + return nil, err + } + return response.(peer.IDSlice), nil +} + +func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { + _, err := n.postTask(requestTypeDisconnectPeerByID, peerID) + return err +} + +type dnsDiscoveryRequest struct { + ctx context.Context + enrTreeUrl string + nameDnsServer string +} + +func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { + response, err := n.postTask(requestTypeDnsDiscovery, dnsDiscoveryRequest{ + ctx: ctx, + enrTreeUrl: enrTreeUrl, + nameDnsServer: nameDnsServer, + }) + if err != nil { + return nil, err + } + return response.([]multiaddr.Multiaddr), nil +} + +type dialPeerRequest struct { + ctx context.Context + peerAddr multiaddr.Multiaddr + protocol libp2pproto.ID +} + +func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { + _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ + ctx: ctx, + peerAddr: peerAddr, + protocol: protocol, + }) + return err +} + +func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { + response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) + if err != nil { + return 0, err + } + return response.(int), nil } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 0f4f58ea5..b57c8fa2c 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -12,9 +12,13 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/api/history" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -24,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/status-im/status-go/protocol/tt" + "github.com/status-im/status-go/wakuv2/common" ) var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im" @@ -150,6 +155,28 @@ func parseNodes(rec []string) []*enode.Node { return ns } +type testStorenodeConfigProvider struct { + storenode peer.AddrInfo +} + +func (t *testStorenodeConfigProvider) UseStorenodes() (bool, error) { + return true, nil +} + +func (t *testStorenodeConfigProvider) GetPinnedStorenode() (peer.AddrInfo, error) { + return peer.AddrInfo{}, nil +} + +func (t *testStorenodeConfigProvider) Storenodes() ([]peer.AddrInfo, error) { + return []peer.AddrInfo{t.storenode}, nil +} + +func newTestStorenodeConfigProvider(storenode peer.AddrInfo) history.StorenodeConfigProvider { + return &testStorenodeConfigProvider{ + storenode: storenode, + } +} + // In order to run these tests, you must run an nwaku node // // Using Docker: @@ -187,7 +214,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Start()) - enr, err := w.ENR() + enr, err := w.node.ENR() require.NoError(t, err) require.NotNil(t, enr) @@ -197,7 +224,7 @@ func TestBasicWakuV2(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - numConnected, err := w.GetNumConnectedRelayPeers() + numConnected, err := w.node.GetNumConnectedPeers() if err != nil { return err } @@ -212,96 +239,102 @@ func TestBasicWakuV2(t *testing.T) { // Get local store node address storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - require.NoError(t, err) + + for i := 0; i <= 100; i++ { + time.Sleep(2 * time.Second) + } + + w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode)) // Check that we are indeed connected to the store node - connectedStoreNodes, err := w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") // Disconnect from the store node - err = w.DropPeer(storeNode.ID) + err = w.node.DisconnectPeerByID(storeNode.ID) require.NoError(t, err) // Check that we are indeed disconnected - connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") - storeNodeMultiadd, err := multiaddr.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) - require.NoError(t, err) - // Re-connect - err = w.DialPeer(storeNodeMultiadd) + err = w.DialPeerByID(storeNode.ID) require.NoError(t, err) - time.Sleep(1 * time.Second) - // Check that we are connected again - connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") - /* - filter := &common.Filter{ - PubsubTopic: config.DefaultShardPubsubTopic, - Messages: common.NewMemoryMessageStore(), - ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), - } + filter := &common.Filter{ + PubsubTopic: w.cfg.DefaultShardPubsubTopic, + Messages: common.NewMemoryMessageStore(), + ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), + } - _, err = w.Subscribe(filter) - require.NoError(t, err) + _, err = w.Subscribe(filter) + require.NoError(t, err) - msgTimestamp := w.timestamp() - contentTopic := maps.Keys(filter.ContentTopics)[0] + msgTimestamp := w.timestamp() + contentTopic := maps.Keys(filter.ContentTopics)[0] - time.Sleep(2 * time.Second) + time.Sleep(2 * time.Second) - _, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: contentTopic.ContentTopic(), - Version: proto.Uint32(0), - Timestamp: &msgTimestamp, - }, nil) + msgID, err := w.Send(w.cfg.DefaultShardPubsubTopic, &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: contentTopic.ContentTopic(), + Version: proto.Uint32(0), + Timestamp: &msgTimestamp, + }, nil) - require.NoError(t, err) + require.NoError(t, err) + require.NotEqual(t, msgID, "1") - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) - messages := filter.Retrieve() - require.Len(t, messages, 1) + messages := filter.Retrieve() + require.Len(t, messages, 1) - timestampInSeconds := msgTimestamp / int64(time.Second) - marginInSeconds := 20 + timestampInSeconds := msgTimestamp / int64(time.Second) + marginInSeconds := 20 - options = func(b *backoff.ExponentialBackOff) { - b.MaxElapsedTime = 60 * time.Second - b.InitialInterval = 500 * time.Millisecond - } - err = tt.RetryWithBackOff(func() error { - _, envelopeCount, err := w.Query( - context.Background(), - storeNode.PeerID, - store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(config.DefaultShardPubsubTopic, contentTopic.ContentTopic()), - TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), - TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), - }, - nil, - nil, - false, - ) - if err != nil || envelopeCount == 0 { - // in case of failure extend timestamp margin up to 40secs - if marginInSeconds < 40 { - marginInSeconds += 5 - } - return errors.New("no messages received from store node") + options = func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 60 * time.Second + b.InitialInterval = 500 * time.Millisecond + } + err = tt.RetryWithBackOff(func() error { + err := w.HistoryRetriever.Query( + context.Background(), + store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(w.cfg.DefaultShardPubsubTopic, contentTopic.ContentTopic()), + TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), + TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), + }, + *storeNode, + 10, + nil, false, + ) + + return err + + // TODO-nwaku + /*if err != nil || envelopeCount == 0 { + // in case of failure extend timestamp margin up to 40secs + if marginInSeconds < 40 { + marginInSeconds += 5 } - return nil - }, options) - require.NoError(t, err) */ + return errors.New("no messages received from store node") + } + return nil*/ + + }, options) + require.NoError(t, err) + + time.Sleep(10 * time.Second) require.NoError(t, w.Stop()) } @@ -356,10 +389,10 @@ func TestPeerExchange(t *testing.T) { time.Sleep(1 * time.Second) - discV5NodePeerId, err := discV5Node.PeerID() + discV5NodePeerId, err := discV5Node.node.PeerID() require.NoError(t, err) - discv5NodeEnr, err := discV5Node.ENR() + discv5NodeEnr, err := discV5Node.node.ENR() require.NoError(t, err) pxServerConfig := Config{ @@ -387,7 +420,7 @@ func TestPeerExchange(t *testing.T) { // Adding an extra second to make sure PX cache is not empty time.Sleep(2 * time.Second) - serverNodeMa, err := pxServerNode.ListenAddresses() + serverNodeMa, err := pxServerNode.node.ListenAddresses() require.NoError(t, err) require.NotNil(t, serverNodeMa) @@ -398,7 +431,7 @@ func TestPeerExchange(t *testing.T) { // Check that pxServerNode has discV5Node in its Peer Store err = tt.RetryWithBackOff(func() error { - peers, err := pxServerNode.GetPeerIdsFromPeerStore() + peers, err := pxServerNode.node.GetPeerIDsFromPeerStore() if err != nil { return err @@ -436,12 +469,12 @@ func TestPeerExchange(t *testing.T) { time.Sleep(1 * time.Second) - pxServerPeerId, err := pxServerNode.PeerID() + pxServerPeerId, err := pxServerNode.node.PeerID() require.NoError(t, err) // Check that the light node discovered the discV5Node and has both nodes in its peer store err = tt.RetryWithBackOff(func() error { - peers, err := lightNode.GetPeerIdsFromPeerStore() + peers, err := lightNode.node.GetPeerIDsFromPeerStore() if err != nil { return err } @@ -455,7 +488,7 @@ func TestPeerExchange(t *testing.T) { // Now perform the PX request manually to see if it also works err = tt.RetryWithBackOff(func() error { - numPeersReceived, err := lightNode.WakuPeerExchangeRequest(1) + numPeersReceived, err := lightNode.node.PeerExchangeRequest(1) if err != nil { return err } @@ -551,88 +584,6 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, discV5Node.Stop()) */ } -func TestDial(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - - dialerNodeConfig := Config{ - UseThrottledPublish: true, - ClusterID: 16, - } - - // start node that will initiate the dial - dialerNodeWakuConfig := WakuConfig{ - EnableRelay: true, - LogLevel: "DEBUG", - Discv5Discovery: false, - ClusterID: 16, - Shards: []uint16{64}, - Discv5UdpPort: 9020, - TcpPort: 60020, - } - - dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil) - require.NoError(t, err) - require.NoError(t, dialerNode.Start()) - - time.Sleep(1 * time.Second) - - receiverNodeConfig := Config{ - UseThrottledPublish: true, - ClusterID: 16, - } - - // start node that will receive the dial - receiverNodeWakuConfig := WakuConfig{ - EnableRelay: true, - LogLevel: "DEBUG", - Discv5Discovery: false, - ClusterID: 16, - Shards: []uint16{64}, - Discv5UdpPort: 9021, - TcpPort: 60021, - } - - receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil) - require.NoError(t, err) - require.NoError(t, receiverNode.Start()) - - time.Sleep(1 * time.Second) - - receiverMultiaddr, err := receiverNode.ListenAddresses() - require.NoError(t, err) - require.NotNil(t, receiverMultiaddr) - - // Check that both nodes start with no connected peers - dialerPeerCount, err := dialerNode.PeerCount() - require.NoError(t, err) - require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers") - - receiverPeerCount, err := receiverNode.PeerCount() - require.NoError(t, err) - require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") - - // Dial - err = dialerNode.DialPeer(receiverMultiaddr[0]) - require.NoError(t, err) - - time.Sleep(1 * time.Second) - - // Check that both nodes now have one connected peer - dialerPeerCount, err = dialerNode.PeerCount() - require.NoError(t, err) - require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") - - receiverPeerCount, err = receiverNode.PeerCount() - require.NoError(t, err) - require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") - - // Stop nodes - require.NoError(t, dialerNode.Stop()) - require.NoError(t, receiverNode.Stop()) - -} - func TestDnsDiscover(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) @@ -655,15 +606,80 @@ func TestDnsDiscover(t *testing.T) { time.Sleep(1 * time.Second) sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im" - res, err := node.WakuDnsDiscovery(sampleEnrTree, nodeConfig.Nameserver, int(requestTimeout/time.Millisecond)) + ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + defer cancel() + res, err := node.node.DnsDiscovery(ctx, sampleEnrTree, nodeConfig.Nameserver) require.NoError(t, err) - require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query") - // Stop nodes require.NoError(t, node.Stop()) } +func TestDial(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + dialerNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + // start node that will initiate the dial + dialerNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9020, + TcpPort: 60020, + } + dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, dialerNode.Start()) + time.Sleep(1 * time.Second) + receiverNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + // start node that will receive the dial + receiverNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9021, + TcpPort: 60021, + } + receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, receiverNode.Start()) + time.Sleep(1 * time.Second) + receiverMultiaddr, err := receiverNode.node.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, receiverMultiaddr) + // Check that both nodes start with no connected peers + dialerPeerCount, err := dialerNode.PeerCount() + require.NoError(t, err) + require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers") + receiverPeerCount, err := receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") + // Dial + err = dialerNode.DialPeer(receiverMultiaddr[0]) + require.NoError(t, err) + time.Sleep(1 * time.Second) + // Check that both nodes now have one connected peer + dialerPeerCount, err = dialerNode.PeerCount() + require.NoError(t, err) + require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") + receiverPeerCount, err = receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") + // Stop nodes + require.NoError(t, dialerNode.Stop()) + require.NoError(t, receiverNode.Stop()) +} + /* func TestWakuV2Filter(t *testing.T) { diff --git a/wakuv2/pinger.go b/wakuv2/pinger.go new file mode 100644 index 000000000..ef5f14727 --- /dev/null +++ b/wakuv2/pinger.go @@ -0,0 +1,26 @@ +//go:build use_nwaku +// +build use_nwaku + +package wakuv2 + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + commonapi "github.com/waku-org/go-waku/waku/v2/api/common" +) + +type pinger struct { + node *WakuNode +} + +func newPinger(node *WakuNode) commonapi.Pinger { + return &pinger{ + node: node, + } +} + +func (p *pinger) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { + return p.node.PingPeer(ctx, peerInfo) +} diff --git a/wakuv2/publisher.go b/wakuv2/publisher.go new file mode 100644 index 000000000..52ffd3e35 --- /dev/null +++ b/wakuv2/publisher.go @@ -0,0 +1,37 @@ +//go:build use_nwaku +// +build use_nwaku + +package wakuv2 + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/api/publish" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +type nwakuPublisher struct { + node *WakuNode +} + +func newPublisher(node *WakuNode) publish.Publisher { + return &nwakuPublisher{ + node: node, + } +} + +func (p *nwakuPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) { + // TODO-nwaku + return nil, nil +} + +func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + return p.node.RelayPublish(ctx, message, pubsubTopic) +} + +// LightpushPublish publishes a message via WakuLightPush +func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) { + // TODO-nwaku + return pb.MessageHash{}, nil +} diff --git a/wakuv2/result.go b/wakuv2/result.go new file mode 100644 index 000000000..0e7dc1a9d --- /dev/null +++ b/wakuv2/result.go @@ -0,0 +1,77 @@ +//go:build use_nwaku +// +build use_nwaku + +package wakuv2 + +import ( + "context" + "encoding/hex" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" +) + +type storeResultImpl struct { + done bool + + node *WakuNode + storeRequest *storepb.StoreQueryRequest + storeResponse *storepb.StoreQueryResponse + peerInfo peer.AddrInfo +} + +func newStoreResultImpl(node *WakuNode, peerInfo peer.AddrInfo, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl { + return &storeResultImpl{ + node: node, + storeRequest: storeRequest, + storeResponse: storeResponse, + peerInfo: peerInfo, + } +} + +func (r *storeResultImpl) Cursor() []byte { + return r.storeResponse.GetPaginationCursor() +} + +func (r *storeResultImpl) IsComplete() bool { + return r.done +} + +func (r *storeResultImpl) PeerID() peer.ID { + return r.peerInfo.ID +} + +func (r *storeResultImpl) Query() *storepb.StoreQueryRequest { + return r.storeRequest +} + +func (r *storeResultImpl) Response() *storepb.StoreQueryResponse { + return r.storeResponse +} + +func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error { + // TODO: opts is being ignored. Will require some changes in go-waku. For now using this + // is not necessary + + if r.storeResponse.GetPaginationCursor() == nil { + r.done = true + return nil + } + + r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) + r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor + + storeResponse, err := r.node.StoreQuery(ctx, r.storeRequest, r.peerInfo) + if err != nil { + return err + } + + r.storeResponse = storeResponse + return nil +} + +func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue { + return r.storeResponse.GetMessages() +} diff --git a/wakuv2/storenode_message_verifier.go b/wakuv2/storenode_message_verifier.go new file mode 100644 index 000000000..f0a7cb2ed --- /dev/null +++ b/wakuv2/storenode_message_verifier.go @@ -0,0 +1,59 @@ +//go:build use_nwaku +// +build use_nwaku + +package wakuv2 + +import ( + "context" + "encoding/hex" + "fmt" + "net/http" + + "github.com/golang/protobuf/proto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/api/publish" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" +) + +type storenodeMessageVerifier struct { + node *WakuNode +} + +func newStorenodeMessageVerifier(node *WakuNode) publish.StorenodeMessageVerifier { + return &storenodeMessageVerifier{ + node: node, + } +} + +func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { + requestIDStr := hex.EncodeToString(requestID) + storeRequest := &storepb.StoreQueryRequest{ + RequestId: requestIDStr, + MessageHashes: make([][]byte, len(messageHashes)), + IncludeData: false, + PaginationCursor: nil, + PaginationForward: false, + PaginationLimit: proto.Uint64(pageSize), + } + + for i, mhash := range messageHashes { + storeRequest.MessageHashes[i] = mhash.Bytes() + } + + response, err := d.node.StoreQuery(ctx, storeRequest, peerInfo) + if err != nil { + return nil, err + } + + if response.GetStatusCode() != http.StatusOK { + return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, response.GetStatusCode(), response.GetStatusDesc()) + } + + result := make([]pb.MessageHash, len(response.Messages)) + for i, msg := range response.Messages { + result[i] = pb.ToMessageHash(msg.GetMessageHash()) + } + + return result, nil +} diff --git a/wakuv2/storenode_requestor.go b/wakuv2/storenode_requestor.go new file mode 100644 index 000000000..b623a6e8e --- /dev/null +++ b/wakuv2/storenode_requestor.go @@ -0,0 +1,76 @@ +//go:build use_nwaku +// +build use_nwaku + +package wakuv2 + +import ( + "context" + "encoding/hex" + "fmt" + "net/http" + + "github.com/golang/protobuf/proto" + "github.com/libp2p/go-libp2p/core/peer" + commonapi "github.com/waku-org/go-waku/waku/v2/api/common" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "go.uber.org/zap" +) + +type storenodeRequestor struct { + node *WakuNode + logger *zap.Logger +} + +func newStorenodeRequestor(node *WakuNode, logger *zap.Logger) commonapi.StorenodeRequestor { + return &storenodeRequestor{ + node: node, + logger: logger.Named("storenodeRequestor"), + } +} + +func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) { + requestIDStr := hex.EncodeToString(protocol.GenerateRequestID()) + + logger := s.logger.With(zap.Stringer("peerID", peerInfo.ID), zap.String("requestID", requestIDStr)) + + logger.Debug("sending store request") + + storeRequest := &storepb.StoreQueryRequest{ + RequestId: requestIDStr, + MessageHashes: make([][]byte, len(messageHashes)), + IncludeData: true, + PaginationCursor: nil, + PaginationForward: false, + PaginationLimit: proto.Uint64(pageSize), + } + + for i, mhash := range messageHashes { + storeRequest.MessageHashes[i] = mhash.Bytes() + } + + storeResponse, err := s.node.StoreQuery(ctx, storeRequest, peerInfo) + if err != nil { + return nil, err + } + + if storeResponse.GetStatusCode() != http.StatusOK { + return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) + } + + return newStoreResultImpl(s.node, peerInfo, storeRequest, storeResponse), nil +} + +func (s *storenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *storepb.StoreQueryRequest) (commonapi.StoreRequestResult, error) { + storeResponse, err := s.node.StoreQuery(ctx, storeRequest, peerInfo) + if err != nil { + return nil, err + } + + if storeResponse.GetStatusCode() != http.StatusOK { + return nil, fmt.Errorf("could not query storenode: %s %d %s", storeRequest.RequestId, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) + } + + return newStoreResultImpl(s.node, peerInfo, storeRequest, storeResponse), nil +}