From 4fa26147e8fc7f41f3818f88510d08a51a7724f5 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 8 Oct 2024 14:25:21 +0300 Subject: [PATCH] commenting a lot of things and some config changes to start testing --- wakuv2/api.go | 24 +------- wakuv2/api_test.go | 12 +--- wakuv2/message_publishing.go | 18 +----- wakuv2/nwaku.go | 109 +++++++++++++++++------------------ wakuv2/nwaku_test.go | 44 ++++++++------ 5 files changed, 86 insertions(+), 121 deletions(-) diff --git a/wakuv2/api.go b/wakuv2/api.go index f106b32f5..53f9f2151 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -18,27 +18,7 @@ package wakuv2 -import ( - "context" - "crypto/ecdsa" - "errors" - "fmt" - "sync" - "time" - - "github.com/waku-org/go-waku/waku/v2/payload" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - - "github.com/status-im/status-go/wakuv2/common" - - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rpc" - - "google.golang.org/protobuf/proto" -) - +/* // List of errors var ( ErrSymAsym = errors.New("specify either a symmetric or an asymmetric key") @@ -513,4 +493,4 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) { api.mu.Unlock() return id, nil -} +} */ diff --git a/wakuv2/api_test.go b/wakuv2/api_test.go index 7a060bf5f..7119650ba 100644 --- a/wakuv2/api_test.go +++ b/wakuv2/api_test.go @@ -18,16 +18,7 @@ package wakuv2 -import ( - "testing" - "time" - - "golang.org/x/exp/maps" - - "github.com/status-im/status-go/wakuv2/common" -) - -func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { +/* func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { w, err := New(nil, "", nil, nil, nil, nil, nil, nil) if err != nil { t.Fatalf("Error creating WakuV2 client: %v", err) @@ -68,3 +59,4 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { t.Fatalf("Could not find filter with both topics") } } +*/ \ No newline at end of file diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 25f8f57d8..49051e55c 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,19 +1,5 @@ package wakuv2 -import ( - "errors" - - "go.uber.org/zap" - - "github.com/waku-org/go-waku/waku/v2/api/publish" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/status-im/status-go/wakuv2/common" -) - type PublishMethod int const ( @@ -34,7 +20,7 @@ func (pm PublishMethod) String() string { // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. -func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { +/* func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { pubsubTopic = w.GetPubsubTopic(pubsubTopic) if w.protectedTopicStore != nil { privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) @@ -160,4 +146,4 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.Pu }) } } -} +} */ diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index c5234ee78..ac6d82601 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -269,7 +269,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/rpc" "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" @@ -350,7 +349,7 @@ func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { return subscription } -func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { +/* func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { wg := sync.WaitGroup{} mu := sync.Mutex{} var result []*enode.Node @@ -394,11 +393,12 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) wg.Wait() return result, nil -} +} */ type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) -func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { +// This should be handled by nwaku? +/* func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress)) ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() @@ -439,9 +439,10 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA wg.Wait() return nil -} +} */ -func (w *Waku) discoverAndConnectPeers() { +// This too? nwaku? +/* func (w *Waku) discoverAndConnectPeers() { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() if len(d.PeerInfo.Addrs) != 0 { @@ -475,7 +476,7 @@ func (w *Waku) discoverAndConnectPeers() { go w.connect(*peerInfo, nil, wps.Static) } } -} +} */ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { // Connection will be prunned eventually by the connection manager if needed @@ -484,7 +485,7 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi w.WakuConnect(addr.String(), 1000) } -func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { +/* func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { w.wg.Add(1) defer w.wg.Done() @@ -571,22 +572,22 @@ func (w *Waku) runPeerExchangeLoop() { } } } -} +} */ -func (w *Waku) GetPubsubTopic(topic string) string { +/* func (w *Waku) GetPubsubTopic(topic string) string { if topic == "" { topic = w.cfg.DefaultShardPubsubTopic } return topic -} +} */ // CurrentTime returns current time. func (w *Waku) CurrentTime() time.Time { return w.timesource.Now() } -// APIs returns the RPC descriptors the Waku implementation offers +/* // APIs returns the RPC descriptors the Waku implementation offers func (w *Waku) APIs() []rpc.API { return []rpc.API{ { @@ -596,7 +597,7 @@ func (w *Waku) APIs() []rpc.API { Public: false, }, } -} +} */ // Protocols returns the waku sub-protocols ran by this particular client. func (w *Waku) Protocols() []p2p.Protocol { @@ -851,7 +852,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -func (w *Waku) Subscribe(f *common.Filter) (string, error) { +/* func (w *Waku) Subscribe(f *common.Filter) (string, error) { f.PubsubTopic = w.GetPubsubTopic(f.PubsubTopic) id, err := w.filters.Install(f) if err != nil { @@ -878,7 +879,7 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error { } return nil -} +} */ // GetFilter returns the filter by id. func (w *Waku) GetFilter(id string) *common.Filter { @@ -897,7 +898,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error { return nil } -func (w *Waku) SkipPublishToTopic(value bool) { +/* func (w *Waku) SkipPublishToTopic(value bool) { w.cfg.SkipPublishToTopic = value } @@ -906,7 +907,7 @@ func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { return } w.messageSentCheck.DeleteByMessageIDs(hashes) -} +} */ func (w *Waku) SetStorePeerID(peerID peer.ID) { if w.messageSentCheck != nil { @@ -999,10 +1000,10 @@ func (w *Waku) Query(ctx context.Context, return nil, 0, nil } -// OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. +/* // OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) -} +} */ // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. @@ -1188,7 +1189,7 @@ func (w *Waku) checkForConnectionChanges() { // }() // } -func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { +/* func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { w.poolMu.Lock() defer w.poolMu.Unlock() return w.envelopeCache.Has(gethcommon.Hash(mh)), nil @@ -1200,9 +1201,9 @@ func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic s } w.missingMsgVerifier.SetCriteriaInterest(peerID, protocol.NewContentFilter(pubsubTopic, contentTopics...)) -} +} */ -func (w *Waku) setupRelaySubscriptions() error { +/* func (w *Waku) setupRelaySubscriptions() error { if w.cfg.LightClient { return nil } @@ -1234,9 +1235,9 @@ func (w *Waku) setupRelaySubscriptions() error { } return nil -} +} */ -func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { +/* func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil } @@ -1323,9 +1324,9 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) // postEvent queues the message for further processing. func (w *Waku) postEvent(envelope *common.ReceivedMessage) { w.msgQueue <- envelope -} +} */ -// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. +/* // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { if w.ctx == nil { return @@ -1379,7 +1380,7 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { Hash: e.Hash(), Event: common.EventEnvelopeAvailable, }) -} +} */ // GetEnvelope retrieves an envelope from the message queue by its hash. // It returns nil if the envelope can not be found. @@ -1421,7 +1422,7 @@ func (w *Waku) Peers() types.PeerStats { // return FormatPeerStats(w.node) } -func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { +/* func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { if w.cfg.LightClient { return nil, errors.New("only available in relay mode") } @@ -1483,7 +1484,7 @@ func (w *Waku) RemovePubsubTopicKey(topic string) error { } return w.protectedTopicStore.Delete(topic) -} +} */ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers @@ -1497,7 +1498,7 @@ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { // } } -func (w *Waku) ConnectionChanged(state connection.State) { +/* func (w *Waku) ConnectionChanged(state connection.State) { isOnline := !state.Offline if w.cfg.LightClient { //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 @@ -1519,7 +1520,7 @@ func (w *Waku) ConnectionChanged(state connection.State) { w.onlineChecker.SetOnline(isOnline) } w.state = state -} +} */ func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) { // peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300) @@ -1667,15 +1668,17 @@ type WakuPubsubTopic = string type WakuContentTopic = string type WakuConfig struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NodeKey string `json:"key,omitempty"` - EnableRelay bool `json:"relay"` - LogLevel string `json:"logLevel"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NodeKey string `json:"key,omitempty"` + EnableRelay bool `json:"relay"` + LogLevel string `json:"logLevel"` + DnsDiscovery string `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` + DiscV5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` } -var jamon unsafe.Pointer - type Waku struct { wakuCtx unsafe.Pointer @@ -1710,7 +1713,8 @@ type Waku struct { cancel context.CancelFunc wg sync.WaitGroup - cfg *Config + // cfg *Config + cfg *WakuConfig options []node.WakuNodeOption envelopeFeed event.Feed @@ -1769,20 +1773,12 @@ func printStackTrace() { func wakuNew(nodeKey *ecdsa.PrivateKey, fleet string, - cfg *Config, + cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { - nwakuConfig := WakuConfig{ - Host: cfg.Host, - Port: 30303, - NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", - EnableRelay: true, - LogLevel: "DEBUG", - } - var err error if logger == nil { logger, err = zap.NewDevelopment() @@ -1794,18 +1790,20 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, ts = timesource.Default() } - cfg = setDefaults(cfg) + /* cfg = setDefaults(cfg) if err = cfg.Validate(logger); err != nil { return nil, err - } + } */ ctx, cancel := context.WithCancel(context.Background()) - jsonConfig, err := json.Marshal(nwakuConfig) + jsonConfig, err := json.Marshal(cfg) if err != nil { return nil, err } + fmt.Println("-------- CREATING CONFIG, jsonConfig: ", string(jsonConfig)) + var cJsonConfig = C.CString(string(jsonConfig)) var resp = C.allocResp() @@ -1813,7 +1811,6 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, defer C.freeResp(resp) wakuCtx := C.cGoWakuNew(cJsonConfig, resp) - jamon = wakuCtx // Notice that the events for self node are handled by the 'MyEventCallback' method if C.getRet(resp) == C.RET_OK { @@ -1841,7 +1838,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), - sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), }, nil } @@ -2386,14 +2383,14 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { // } // MaxMessageSize returns the maximum accepted message size. -func (w *Waku) MaxMessageSize() uint32 { +/* func (w *Waku) MaxMessageSize() uint32 { return w.cfg.MaxMessageSize -} +} */ // New creates a WakuV2 client ready to communicate through the LibP2P network. func New(nodeKey *ecdsa.PrivateKey, fleet string, - cfg *Config, + cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 9ef5a6422..be33fa890 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -5,11 +5,8 @@ package wakuv2 import ( "context" - "crypto/rand" "errors" "fmt" - "math/big" - "os" "testing" "time" @@ -21,16 +18,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" - "google.golang.org/protobuf/proto" - - "github.com/waku-org/go-waku/waku/v2/dnsdisc" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/wakuv2/common" ) var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im" @@ -174,7 +163,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) fmt.Println("---------- GABRIEL 2 ----------") - // Creating a fake DNS Discovery ENRTree + /* // Creating a fake DNS Discovery ENRTree tree, url := makeTestTree("n", parseNodes([]string{nwakuInfo.EnrUri}), nil) fmt.Println("---------- GABRIEL 3 ----------") enrTreeAddress := url @@ -184,14 +173,27 @@ func TestBasicWakuV2(t *testing.T) { } fmt.Println("---------- GABRIEL 4 ----------") - config := &Config{} + fmt.Println("---------- enrTreeAddress: ", enrTreeAddress) */ + + // Instead of the Config type, use WakuConfig + /* config := &Config{} setDefaultConfig(config, false) config.Port = 0 config.Resolver = mapResolver(tree.ToTXT("n")) config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscoveryLimit = 20 - config.WakuNodes = []string{enrTreeAddress} - w, err := New(nil, "", config, nil, nil, nil, nil, nil) + config.WakuNodes = []string{enrTreeAddress} */ + + nwakuConfig := WakuConfig{ + // Host: cfg.Host, + Port: 30303, + NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", + EnableRelay: true, + LogLevel: "DEBUG", + DiscV5BootstrapNodes: []string{nwakuInfo.EnrUri}, + } + + w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) fmt.Println("---------- GABRIEL 5 ----------") require.NoError(t, err) require.NoError(t, w.Start()) @@ -203,6 +205,7 @@ func TestBasicWakuV2(t *testing.T) { fmt.Println("---------- GABRIEL 7 ----------") + /* // DNSDiscovery ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) defer cancel() @@ -212,6 +215,8 @@ func TestBasicWakuV2(t *testing.T) { discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrTreeAddress, dnsdisc.WithResolver(config.Resolver)) require.NoError(t, err) + fmt.Println("---------- discoveredNodes: ", discoveredNodes) + fmt.Println("---------- GABRIEL 9 ----------") // Peer used for retrieving history r, err := rand.Int(rand.Reader, big.NewInt(int64(len(discoveredNodes)))) @@ -219,6 +224,11 @@ func TestBasicWakuV2(t *testing.T) { storeNode := discoveredNodes[int(r.Int64())] + fmt.Println("---------- storeNode: ", storeNode) + */ + + // storeNode := discoveredNodes[int(r.Int64())] + fmt.Println("---------- GABRIEL 10 ----------") options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 30 * time.Second @@ -236,7 +246,7 @@ func TestBasicWakuV2(t *testing.T) { fmt.Println("---------- GABRIEL 12 ----------") - // Dropping Peer + /* // Dropping Peer err = w.DropPeer(storeNode.PeerID) require.NoError(t, err) @@ -309,7 +319,7 @@ func TestBasicWakuV2(t *testing.T) { } return nil }, options) - require.NoError(t, err) + require.NoError(t, err) */ require.NoError(t, w.Stop()) }