From 386c84b8599d822799fa74307ce951e7e369cb04 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 8 Jan 2025 18:53:26 +0100 Subject: [PATCH] start removing Waku type --- waku/nwaku.go | 106 ++------------------------------------- waku/nwaku_test.go | 120 +++++++++++++++++++++++++++------------------ 2 files changed, 76 insertions(+), 150 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 8434ea0..e0742c2 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -417,79 +417,6 @@ func (rl RateLimit) MarshalJSON() ([]byte, error) { return json.Marshal(rl.String()) } -// Waku represents a dark communication interface through the Ethereum -// network, using its very own P2P communication layer. -type Waku struct { - node *WakuNode - - ctx context.Context - cancel context.CancelFunc - - wakuCfg *WakuConfig - - logger *zap.Logger -} - -// Start implements node.Service, starting the background data propagation thread -// of the Waku protocol. -func (w *Waku) Start() error { - err := w.node.Start() - if err != nil { - return fmt.Errorf("failed to start nwaku node: %v", err) - } - - peerID, err := w.node.PeerID() - if err != nil { - return err - } - - w.logger.Info("Waku PeerID", zap.Stringer("id", peerID)) - - return nil -} - -// Stop implements node.Service, stopping the background data propagation thread -// of the Waku protocol. -func (w *Waku) Stop() error { - w.cancel() - - err := w.node.Stop() - if err != nil { - return err - } - - w.ctx = nil - w.cancel = nil - - return nil -} - -func (w *Waku) PeerCount() (int, error) { - return w.node.GetNumConnectedPeers() -} - -func (w *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { - return w.node.ListenAddresses() -} - -func (w *Waku) DialPeer(ctx context.Context, address multiaddr.Multiaddr) error { - // Using WakuConnect so it matches the go-waku's behavior and terminology - return w.node.Connect(ctx, address) -} - -// TODO: change pubsub topic to shard notation everywhere -func (w *Waku) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - return w.node.RelayPublish(ctx, message, pubsubTopic) -} - -func (w *Waku) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { - return w.node.DialPeerByID(ctx, peerID, protocol) -} - -func (w *Waku) DropPeer(peerID peer.ID) error { - return w.node.DisconnectPeerByID(peerID) -} - //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if resp != nil { @@ -505,6 +432,7 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer + wakuCfg *WakuConfig logger *zap.Logger cancel context.CancelFunc MsgChan chan common.Envelope @@ -515,7 +443,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (* ctx, cancel := context.WithCancel(ctx) n := &WakuNode{ - cancel: cancel, + cancel: cancel, + wakuCfg: config, } wg := sync.WaitGroup{} @@ -564,34 +493,6 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (* return n, nil } -// New creates a Waku client ready to communicate through the LibP2P network. -func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) { - var err error - if logger == nil { - logger, err = zap.NewDevelopment() - if err != nil { - return nil, err - } - } - - logger.Info("starting Waku with config", zap.Any("nwakuCfg", nwakuCfg)) - - ctx, cancel := context.WithCancel(context.Background()) - wakunode, err := newWakuNode(ctx, nwakuCfg, logger) - if err != nil { - cancel() - return nil, err - } - - return &Waku{ - node: wakunode, - wakuCfg: nwakuCfg, - logger: logger, - ctx: ctx, - cancel: cancel, - }, nil -} - // The event callback sends back the node's ctx to know to which // node is the event being emited for. Since we only have a global // callback in the go side, We register all the nodes that we create @@ -658,6 +559,7 @@ func (n *WakuNode) OnEvent(eventStr string) { } func (n *WakuNode) parseMessageEvent(eventStr string) { + fmt.Println("----------- got message event: ", eventStr) envelope, err := common.NewEnvelope(eventStr) if err != nil { n.logger.Error("could not parse message", zap.Error(err)) diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 4c5e6d2..3d26816 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -50,11 +50,13 @@ func TestBasicWaku(t *testing.T) { storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - w, err := New(&nwakuConfig, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + w, err := newWakuNode(ctx, &nwakuConfig, nil) require.NoError(t, err) require.NoError(t, w.Start()) - enr, err := w.node.ENR() + enr, err := w.ENR() require.NoError(t, err) require.NotNil(t, enr) @@ -64,7 +66,7 @@ func TestBasicWaku(t *testing.T) { // Sanity check, not great, but it's probably helpful err = RetryWithBackOff(func() error { - numConnected, err := w.node.GetNumConnectedPeers() + numConnected, err := w.GetNumConnectedPeers() if err != nil { return err } @@ -87,28 +89,28 @@ func TestBasicWaku(t *testing.T) { */ // Check that we are indeed connected to the store node - connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) + connectedStoreNodes, err := w.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") // Disconnect from the store node - err = w.node.DisconnectPeerByID(storeNode.ID) + err = w.DisconnectPeerByID(storeNode.ID) require.NoError(t, err) // Check that we are indeed disconnected - connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) + connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") // Re-connect - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ctx2, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() - err = w.DialPeer(ctx, storeNodeMa) + err = w.Connect(ctx2, storeNodeMa) require.NoError(t, err) // Check that we are connected again - connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) + connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") @@ -198,16 +200,18 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60010, } - discV5Node, err := New(&discV5NodeWakuConfig, logger.Named("discV5Node")) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discV5Node, err := newWakuNode(ctx, &discV5NodeWakuConfig, logger.Named("discV5Node")) require.NoError(t, err) require.NoError(t, discV5Node.Start()) time.Sleep(1 * time.Second) - discV5NodePeerId, err := discV5Node.node.PeerID() + discV5NodePeerId, err := discV5Node.PeerID() require.NoError(t, err) - discv5NodeEnr, err := discV5Node.node.ENR() + discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) // start node which serves as PeerExchange server @@ -223,7 +227,9 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60011, } - pxServerNode, err := New(&pxServerWakuConfig, logger.Named("pxServerNode")) + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + pxServerNode, err := newWakuNode(ctx2, &pxServerWakuConfig, logger.Named("pxServerNode")) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -241,7 +247,7 @@ func TestPeerExchange(t *testing.T) { // Check that pxServerNode has discV5Node in its Peer Store err = RetryWithBackOff(func() error { - peers, err := pxServerNode.node.GetPeerIDsFromPeerStore() + peers, err := pxServerNode.GetPeerIDsFromPeerStore() if err != nil { return err @@ -268,18 +274,20 @@ func TestPeerExchange(t *testing.T) { PeerExchangeNode: serverNodeMa[0].String(), } - lightNode, err := New(&pxClientWakuConfig, logger.Named("lightNode")) + ctx3, cancel3 := context.WithCancel(context.Background()) + defer cancel3() + lightNode, err := newWakuNode(ctx3, &pxClientWakuConfig, logger.Named("lightNode")) require.NoError(t, err) require.NoError(t, lightNode.Start()) time.Sleep(1 * time.Second) - pxServerPeerId, err := pxServerNode.node.PeerID() + pxServerPeerId, err := pxServerNode.PeerID() require.NoError(t, err) // Check that the light node discovered the discV5Node and has both nodes in its peer store err = RetryWithBackOff(func() error { - peers, err := lightNode.node.GetPeerIDsFromPeerStore() + peers, err := lightNode.GetPeerIDsFromPeerStore() if err != nil { return err } @@ -293,7 +301,7 @@ func TestPeerExchange(t *testing.T) { // Now perform the PX request manually to see if it also works err = RetryWithBackOff(func() error { - numPeersReceived, err := lightNode.node.PeerExchangeRequest(1) + numPeersReceived, err := lightNode.PeerExchangeRequest(1) if err != nil { return err } @@ -325,15 +333,17 @@ func TestDnsDiscover(t *testing.T) { Discv5UdpPort: 9020, TcpPort: 60020, } - node, err := New(&nodeWakuConfig, logger.Named("node")) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + node, err := newWakuNode(ctx, &nodeWakuConfig, logger.Named("node")) require.NoError(t, err) require.NoError(t, node.Start()) time.Sleep(1 * time.Second) sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im" - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) - defer cancel() - res, err := node.node.DnsDiscovery(ctx, sampleEnrTree, nameserver) + ctx2, cancel2 := context.WithTimeout(context.TODO(), requestTimeout) + defer cancel2() + res, err := node.DnsDiscovery(ctx2, sampleEnrTree, nameserver) require.NoError(t, err) require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query") // Stop nodes @@ -355,7 +365,9 @@ func TestDial(t *testing.T) { TcpPort: 60030, } - dialerNode, err := New(&dialerNodeWakuConfig, logger.Named("dialerNode")) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dialerNode, err := newWakuNode(ctx, &dialerNodeWakuConfig, logger.Named("dialerNode")) require.NoError(t, err) require.NoError(t, dialerNode.Start()) time.Sleep(1 * time.Second) @@ -370,7 +382,9 @@ func TestDial(t *testing.T) { Discv5UdpPort: 9031, TcpPort: 60031, } - receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode")) + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) require.NoError(t, receiverNode.Start()) time.Sleep(1 * time.Second) @@ -378,23 +392,23 @@ func TestDial(t *testing.T) { require.NoError(t, err) require.NotNil(t, receiverMultiaddr) // Check that both nodes start with no connected peers - dialerPeerCount, err := dialerNode.PeerCount() + dialerPeerCount, err := dialerNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers") - receiverPeerCount, err := receiverNode.PeerCount() + receiverPeerCount, err := receiverNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") // Dial - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - defer cancel() - err = dialerNode.DialPeer(ctx, receiverMultiaddr[0]) + ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel3() + err = dialerNode.Connect(ctx3, receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer - dialerPeerCount, err = dialerNode.PeerCount() + dialerPeerCount, err = dialerNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") - receiverPeerCount, err = receiverNode.PeerCount() + receiverPeerCount, err = receiverNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") // Stop nodes @@ -417,7 +431,9 @@ func TestRelay(t *testing.T) { TcpPort: 60040, } - senderNode, err := New(&senderNodeWakuConfig, logger.Named("senderNode")) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + senderNode, err := newWakuNode(ctx, &senderNodeWakuConfig, logger.Named("senderNode")) require.NoError(t, err) require.NoError(t, senderNode.Start()) time.Sleep(1 * time.Second) @@ -432,7 +448,9 @@ func TestRelay(t *testing.T) { Discv5UdpPort: 9041, TcpPort: 60041, } - receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode")) + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) require.NoError(t, receiverNode.Start()) time.Sleep(1 * time.Second) @@ -441,16 +459,16 @@ func TestRelay(t *testing.T) { require.NotNil(t, receiverMultiaddr) // Dial so they become peers - ctx1, cancel1 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel1() - err = senderNode.DialPeer(ctx1, receiverMultiaddr[0]) + ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel3() + err = senderNode.Connect(ctx3, receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer - senderPeerCount, err := senderNode.PeerCount() + senderPeerCount, err := senderNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer") - receiverPeerCount, err := receiverNode.PeerCount() + receiverPeerCount, err := receiverNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") @@ -462,13 +480,13 @@ func TestRelay(t *testing.T) { } // send message pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) - ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel2() - senderNode.RelayPublish(ctx2, message, pubsubTopic) + ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel4() + senderNode.RelayPublish(ctx4, message, pubsubTopic) // Wait to receive message select { - case envelope := <-receiverNode.node.MsgChan: + case envelope := <-receiverNode.MsgChan: require.NotNil(t, envelope, "Envelope should be received") require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match") require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match") @@ -498,7 +516,9 @@ func TestTopicHealth(t *testing.T) { TcpPort: 60050, } - node1, err := New(&wakuConfig1, logger.Named("node1")) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + node1, err := newWakuNode(ctx, &wakuConfig1, logger.Named("node1")) require.NoError(t, err) require.NoError(t, node1.Start()) time.Sleep(1 * time.Second) @@ -513,7 +533,9 @@ func TestTopicHealth(t *testing.T) { Discv5UdpPort: 9051, TcpPort: 60051, } - node2, err := New(&wakuConfig2, logger.Named("node2")) + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + node2, err := newWakuNode(ctx2, &wakuConfig2, logger.Named("node2")) require.NoError(t, err) require.NoError(t, node2.Start()) time.Sleep(1 * time.Second) @@ -522,20 +544,22 @@ func TestTopicHealth(t *testing.T) { require.NotNil(t, multiaddr2) // node1 dials node2 so they become peers - err = node1.DialPeer(multiaddr2[0]) + ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel3() + err = node1.Connect(ctx3, multiaddr2[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer - peerCount1, err := node1.PeerCount() + peerCount1, err := node1.GetNumConnectedPeers() require.NoError(t, err) require.True(t, peerCount1 == 1, "node1 should have 1 peer") - peerCount2, err := node2.PeerCount() + peerCount2, err := node2.GetNumConnectedPeers() require.NoError(t, err) require.True(t, peerCount2 == 1, "node2 should have 1 peer") // Wait to receive topic health update select { - case topicHealth := <-node2.node.TopicHealthChan: + case topicHealth := <-node2.TopicHealthChan: require.NotNil(t, topicHealth, "topicHealth should be updated") require.Equal(t, topicHealth.TopicHealth, "MinimallyHealthy", "Topic health should be MinimallyHealthy") require.Equal(t, topicHealth.PubsubTopic, FormatWakuRelayTopic(clusterId, shardId), "PubsubTopic should match configured cluster and shard")