diff --git a/waku/nwaku.go b/waku/nwaku.go index 8434ea0..dc77de5 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -317,7 +317,6 @@ import ( "encoding/json" "errors" "fmt" - "runtime" "strconv" "strings" "sync" @@ -417,79 +416,6 @@ func (rl RateLimit) MarshalJSON() ([]byte, error) { return json.Marshal(rl.String()) } -// Waku represents a dark communication interface through the Ethereum -// network, using its very own P2P communication layer. -type Waku struct { - node *WakuNode - - ctx context.Context - cancel context.CancelFunc - - wakuCfg *WakuConfig - - logger *zap.Logger -} - -// Start implements node.Service, starting the background data propagation thread -// of the Waku protocol. -func (w *Waku) Start() error { - err := w.node.Start() - if err != nil { - return fmt.Errorf("failed to start nwaku node: %v", err) - } - - peerID, err := w.node.PeerID() - if err != nil { - return err - } - - w.logger.Info("Waku PeerID", zap.Stringer("id", peerID)) - - return nil -} - -// Stop implements node.Service, stopping the background data propagation thread -// of the Waku protocol. -func (w *Waku) Stop() error { - w.cancel() - - err := w.node.Stop() - if err != nil { - return err - } - - w.ctx = nil - w.cancel = nil - - return nil -} - -func (w *Waku) PeerCount() (int, error) { - return w.node.GetNumConnectedPeers() -} - -func (w *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { - return w.node.ListenAddresses() -} - -func (w *Waku) DialPeer(ctx context.Context, address multiaddr.Multiaddr) error { - // Using WakuConnect so it matches the go-waku's behavior and terminology - return w.node.Connect(ctx, address) -} - -// TODO: change pubsub topic to shard notation everywhere -func (w *Waku) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - return w.node.RelayPublish(ctx, message, pubsubTopic) -} - -func (w *Waku) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { - return w.node.DialPeerByID(ctx, peerID, protocol) -} - -func (w *Waku) DropPeer(peerID peer.ID) error { - return w.node.DisconnectPeerByID(peerID) -} - //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if resp != nil { @@ -505,33 +431,20 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer + config *WakuConfig logger *zap.Logger - cancel context.CancelFunc MsgChan chan common.Envelope TopicHealthChan chan topicHealth } -func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { - ctx, cancel := context.WithCancel(ctx) +func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { n := &WakuNode{ - cancel: cancel, + config: config, + logger: logger, } wg := sync.WaitGroup{} - wg.Add(1) - go func() { - // defer gocommon.LogOnPanic() - - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - wg.Done() - - <-ctx.Done() - }() - - wg.Wait() jsonConfig, err := json.Marshal(config) if err != nil { @@ -552,10 +465,10 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (* wg.Add(1) n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + wg.Wait() + n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) - n.logger = logger.Named("nwaku") - wg.Wait() // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoWakuSetEventCallback(n.wakuCtx) @@ -564,34 +477,6 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (* return n, nil } -// New creates a Waku client ready to communicate through the LibP2P network. -func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) { - var err error - if logger == nil { - logger, err = zap.NewDevelopment() - if err != nil { - return nil, err - } - } - - logger.Info("starting Waku with config", zap.Any("nwakuCfg", nwakuCfg)) - - ctx, cancel := context.WithCancel(context.Background()) - wakunode, err := newWakuNode(ctx, nwakuCfg, logger) - if err != nil { - cancel() - return nil, err - } - - return &Waku{ - node: wakunode, - wakuCfg: nwakuCfg, - logger: logger, - ctx: ctx, - cancel: cancel, - }, nil -} - // The event callback sends back the node's ctx to know to which // node is the event being emited for. Since we only have a global // callback in the go side, We register all the nodes that we create diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 4c5e6d2..d0975e1 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -35,6 +35,9 @@ func TestBasicWaku(t *testing.T) { // ctx := context.Background() + logger, err := zap.NewDevelopment() + require.NoError(t, err) + nwakuConfig := WakuConfig{ Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", Relay: true, @@ -50,11 +53,11 @@ func TestBasicWaku(t *testing.T) { storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - w, err := New(&nwakuConfig, nil) + w, err := NewWakuNode(&nwakuConfig, logger.Named("nwaku")) require.NoError(t, err) require.NoError(t, w.Start()) - enr, err := w.node.ENR() + enr, err := w.ENR() require.NoError(t, err) require.NotNil(t, enr) @@ -64,7 +67,7 @@ func TestBasicWaku(t *testing.T) { // Sanity check, not great, but it's probably helpful err = RetryWithBackOff(func() error { - numConnected, err := w.node.GetNumConnectedPeers() + numConnected, err := w.GetNumConnectedPeers() if err != nil { return err } @@ -87,16 +90,16 @@ func TestBasicWaku(t *testing.T) { */ // Check that we are indeed connected to the store node - connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) + connectedStoreNodes, err := w.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") // Disconnect from the store node - err = w.node.DisconnectPeerByID(storeNode.ID) + err = w.DisconnectPeerByID(storeNode.ID) require.NoError(t, err) // Check that we are indeed disconnected - connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) + connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") @@ -104,11 +107,11 @@ func TestBasicWaku(t *testing.T) { // Re-connect ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() - err = w.DialPeer(ctx, storeNodeMa) + err = w.Connect(ctx, storeNodeMa) require.NoError(t, err) // Check that we are connected again - connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300) + connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") @@ -198,16 +201,16 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60010, } - discV5Node, err := New(&discV5NodeWakuConfig, logger.Named("discV5Node")) + discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node")) require.NoError(t, err) require.NoError(t, discV5Node.Start()) time.Sleep(1 * time.Second) - discV5NodePeerId, err := discV5Node.node.PeerID() + discV5NodePeerId, err := discV5Node.PeerID() require.NoError(t, err) - discv5NodeEnr, err := discV5Node.node.ENR() + discv5NodeEnr, err := discV5Node.ENR() require.NoError(t, err) // start node which serves as PeerExchange server @@ -223,7 +226,7 @@ func TestPeerExchange(t *testing.T) { TcpPort: 60011, } - pxServerNode, err := New(&pxServerWakuConfig, logger.Named("pxServerNode")) + pxServerNode, err := NewWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode")) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -241,7 +244,7 @@ func TestPeerExchange(t *testing.T) { // Check that pxServerNode has discV5Node in its Peer Store err = RetryWithBackOff(func() error { - peers, err := pxServerNode.node.GetPeerIDsFromPeerStore() + peers, err := pxServerNode.GetPeerIDsFromPeerStore() if err != nil { return err @@ -268,18 +271,18 @@ func TestPeerExchange(t *testing.T) { PeerExchangeNode: serverNodeMa[0].String(), } - lightNode, err := New(&pxClientWakuConfig, logger.Named("lightNode")) + lightNode, err := NewWakuNode(&pxClientWakuConfig, logger.Named("lightNode")) require.NoError(t, err) require.NoError(t, lightNode.Start()) time.Sleep(1 * time.Second) - pxServerPeerId, err := pxServerNode.node.PeerID() + pxServerPeerId, err := pxServerNode.PeerID() require.NoError(t, err) // Check that the light node discovered the discV5Node and has both nodes in its peer store err = RetryWithBackOff(func() error { - peers, err := lightNode.node.GetPeerIDsFromPeerStore() + peers, err := lightNode.GetPeerIDsFromPeerStore() if err != nil { return err } @@ -293,7 +296,7 @@ func TestPeerExchange(t *testing.T) { // Now perform the PX request manually to see if it also works err = RetryWithBackOff(func() error { - numPeersReceived, err := lightNode.node.PeerExchangeRequest(1) + numPeersReceived, err := lightNode.PeerExchangeRequest(1) if err != nil { return err } @@ -325,7 +328,8 @@ func TestDnsDiscover(t *testing.T) { Discv5UdpPort: 9020, TcpPort: 60020, } - node, err := New(&nodeWakuConfig, logger.Named("node")) + + node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node")) require.NoError(t, err) require.NoError(t, node.Start()) time.Sleep(1 * time.Second) @@ -333,7 +337,7 @@ func TestDnsDiscover(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) defer cancel() - res, err := node.node.DnsDiscovery(ctx, sampleEnrTree, nameserver) + res, err := node.DnsDiscovery(ctx, sampleEnrTree, nameserver) require.NoError(t, err) require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query") // Stop nodes @@ -355,10 +359,9 @@ func TestDial(t *testing.T) { TcpPort: 60030, } - dialerNode, err := New(&dialerNodeWakuConfig, logger.Named("dialerNode")) + dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode")) require.NoError(t, err) require.NoError(t, dialerNode.Start()) - time.Sleep(1 * time.Second) // start node that will receive the dial receiverNodeWakuConfig := WakuConfig{ @@ -370,31 +373,31 @@ func TestDial(t *testing.T) { Discv5UdpPort: 9031, TcpPort: 60031, } - receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode")) + + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) require.NoError(t, receiverNode.Start()) - time.Sleep(1 * time.Second) receiverMultiaddr, err := receiverNode.ListenAddresses() require.NoError(t, err) require.NotNil(t, receiverMultiaddr) // Check that both nodes start with no connected peers - dialerPeerCount, err := dialerNode.PeerCount() + dialerPeerCount, err := dialerNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers") - receiverPeerCount, err := receiverNode.PeerCount() + receiverPeerCount, err := receiverNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") // Dial ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() - err = dialerNode.DialPeer(ctx, receiverMultiaddr[0]) + err = dialerNode.Connect(ctx, receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer - dialerPeerCount, err = dialerNode.PeerCount() + dialerPeerCount, err = dialerNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") - receiverPeerCount, err = receiverNode.PeerCount() + receiverPeerCount, err = receiverNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") // Stop nodes @@ -417,7 +420,7 @@ func TestRelay(t *testing.T) { TcpPort: 60040, } - senderNode, err := New(&senderNodeWakuConfig, logger.Named("senderNode")) + senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) require.NoError(t, err) require.NoError(t, senderNode.Start()) time.Sleep(1 * time.Second) @@ -432,7 +435,7 @@ func TestRelay(t *testing.T) { Discv5UdpPort: 9041, TcpPort: 60041, } - receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode")) + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) require.NoError(t, receiverNode.Start()) time.Sleep(1 * time.Second) @@ -441,16 +444,16 @@ func TestRelay(t *testing.T) { require.NotNil(t, receiverMultiaddr) // Dial so they become peers - ctx1, cancel1 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel1() - err = senderNode.DialPeer(ctx1, receiverMultiaddr[0]) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = senderNode.Connect(ctx, receiverMultiaddr[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer - senderPeerCount, err := senderNode.PeerCount() + senderPeerCount, err := senderNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer") - receiverPeerCount, err := receiverNode.PeerCount() + receiverPeerCount, err := receiverNode.GetNumConnectedPeers() require.NoError(t, err) require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") @@ -468,7 +471,7 @@ func TestRelay(t *testing.T) { // Wait to receive message select { - case envelope := <-receiverNode.node.MsgChan: + case envelope := <-receiverNode.MsgChan: require.NotNil(t, envelope, "Envelope should be received") require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match") require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match") @@ -498,7 +501,7 @@ func TestTopicHealth(t *testing.T) { TcpPort: 60050, } - node1, err := New(&wakuConfig1, logger.Named("node1")) + node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) require.NoError(t, err) require.NoError(t, node1.Start()) time.Sleep(1 * time.Second) @@ -513,7 +516,7 @@ func TestTopicHealth(t *testing.T) { Discv5UdpPort: 9051, TcpPort: 60051, } - node2, err := New(&wakuConfig2, logger.Named("node2")) + node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2")) require.NoError(t, err) require.NoError(t, node2.Start()) time.Sleep(1 * time.Second) @@ -522,20 +525,22 @@ func TestTopicHealth(t *testing.T) { require.NotNil(t, multiaddr2) // node1 dials node2 so they become peers - err = node1.DialPeer(multiaddr2[0]) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = node1.Connect(ctx, multiaddr2[0]) require.NoError(t, err) time.Sleep(1 * time.Second) // Check that both nodes now have one connected peer - peerCount1, err := node1.PeerCount() + peerCount1, err := node1.GetNumConnectedPeers() require.NoError(t, err) require.True(t, peerCount1 == 1, "node1 should have 1 peer") - peerCount2, err := node2.PeerCount() + peerCount2, err := node2.GetNumConnectedPeers() require.NoError(t, err) require.True(t, peerCount2 == 1, "node2 should have 1 peer") // Wait to receive topic health update select { - case topicHealth := <-node2.node.TopicHealthChan: + case topicHealth := <-node2.TopicHealthChan: require.NotNil(t, topicHealth, "topicHealth should be updated") require.Equal(t, topicHealth.TopicHealth, "MinimallyHealthy", "Topic health should be MinimallyHealthy") require.Equal(t, topicHealth.PubsubTopic, FormatWakuRelayTopic(clusterId, shardId), "PubsubTopic should match configured cluster and shard")