chore: code cleanup (#15)

This commit is contained in:
gabrielmer 2025-01-10 11:21:50 +01:00 committed by GitHub
parent 96fb32a76e
commit b7368a9e73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 53 additions and 163 deletions

View File

@ -317,7 +317,6 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
@ -417,79 +416,6 @@ func (rl RateLimit) MarshalJSON() ([]byte, error) {
return json.Marshal(rl.String())
}
// Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Waku struct {
node *WakuNode
ctx context.Context
cancel context.CancelFunc
wakuCfg *WakuConfig
logger *zap.Logger
}
// Start implements node.Service, starting the background data propagation thread
// of the Waku protocol.
func (w *Waku) Start() error {
err := w.node.Start()
if err != nil {
return fmt.Errorf("failed to start nwaku node: %v", err)
}
peerID, err := w.node.PeerID()
if err != nil {
return err
}
w.logger.Info("Waku PeerID", zap.Stringer("id", peerID))
return nil
}
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
w.cancel()
err := w.node.Stop()
if err != nil {
return err
}
w.ctx = nil
w.cancel = nil
return nil
}
func (w *Waku) PeerCount() (int, error) {
return w.node.GetNumConnectedPeers()
}
func (w *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) {
return w.node.ListenAddresses()
}
func (w *Waku) DialPeer(ctx context.Context, address multiaddr.Multiaddr) error {
// Using WakuConnect so it matches the go-waku's behavior and terminology
return w.node.Connect(ctx, address)
}
// TODO: change pubsub topic to shard notation everywhere
func (w *Waku) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
return w.node.RelayPublish(ctx, message, pubsubTopic)
}
func (w *Waku) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error {
return w.node.DialPeerByID(ctx, peerID, protocol)
}
func (w *Waku) DropPeer(peerID peer.ID) error {
return w.node.DisconnectPeerByID(peerID)
}
//export GoCallback
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if resp != nil {
@ -505,33 +431,20 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
// WakuNode represents an instance of an nwaku node
type WakuNode struct {
wakuCtx unsafe.Pointer
config *WakuConfig
logger *zap.Logger
cancel context.CancelFunc
MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
}
func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
ctx, cancel := context.WithCancel(ctx)
func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
n := &WakuNode{
cancel: cancel,
config: config,
logger: logger,
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// defer gocommon.LogOnPanic()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
wg.Done()
<-ctx.Done()
}()
wg.Wait()
jsonConfig, err := json.Marshal(config)
if err != nil {
@ -552,10 +465,10 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*
wg.Add(1)
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
wg.Wait()
n.MsgChan = make(chan common.Envelope, MsgChanBufferSize)
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
n.logger = logger.Named("nwaku")
wg.Wait()
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)
@ -564,34 +477,6 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*
return n, nil
}
// New creates a Waku client ready to communicate through the LibP2P network.
func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
var err error
if logger == nil {
logger, err = zap.NewDevelopment()
if err != nil {
return nil, err
}
}
logger.Info("starting Waku with config", zap.Any("nwakuCfg", nwakuCfg))
ctx, cancel := context.WithCancel(context.Background())
wakunode, err := newWakuNode(ctx, nwakuCfg, logger)
if err != nil {
cancel()
return nil, err
}
return &Waku{
node: wakunode,
wakuCfg: nwakuCfg,
logger: logger,
ctx: ctx,
cancel: cancel,
}, nil
}
// The event callback sends back the node's ctx to know to which
// node is the event being emited for. Since we only have a global
// callback in the go side, We register all the nodes that we create

View File

@ -35,6 +35,9 @@ func TestBasicWaku(t *testing.T) {
// ctx := context.Background()
logger, err := zap.NewDevelopment()
require.NoError(t, err)
nwakuConfig := WakuConfig{
Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710",
Relay: true,
@ -50,11 +53,11 @@ func TestBasicWaku(t *testing.T) {
storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
require.NoError(t, err)
w, err := New(&nwakuConfig, nil)
w, err := NewWakuNode(&nwakuConfig, logger.Named("nwaku"))
require.NoError(t, err)
require.NoError(t, w.Start())
enr, err := w.node.ENR()
enr, err := w.ENR()
require.NoError(t, err)
require.NotNil(t, enr)
@ -64,7 +67,7 @@ func TestBasicWaku(t *testing.T) {
// Sanity check, not great, but it's probably helpful
err = RetryWithBackOff(func() error {
numConnected, err := w.node.GetNumConnectedPeers()
numConnected, err := w.GetNumConnectedPeers()
if err != nil {
return err
}
@ -87,16 +90,16 @@ func TestBasicWaku(t *testing.T) {
*/
// Check that we are indeed connected to the store node
connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
connectedStoreNodes, err := w.GetPeerIDsByProtocol(store.StoreQueryID_v300)
require.NoError(t, err)
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
// Disconnect from the store node
err = w.node.DisconnectPeerByID(storeNode.ID)
err = w.DisconnectPeerByID(storeNode.ID)
require.NoError(t, err)
// Check that we are indeed disconnected
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300)
require.NoError(t, err)
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
@ -104,11 +107,11 @@ func TestBasicWaku(t *testing.T) {
// Re-connect
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = w.DialPeer(ctx, storeNodeMa)
err = w.Connect(ctx, storeNodeMa)
require.NoError(t, err)
// Check that we are connected again
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300)
require.NoError(t, err)
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
@ -198,16 +201,16 @@ func TestPeerExchange(t *testing.T) {
TcpPort: 60010,
}
discV5Node, err := New(&discV5NodeWakuConfig, logger.Named("discV5Node"))
discV5Node, err := NewWakuNode(&discV5NodeWakuConfig, logger.Named("discV5Node"))
require.NoError(t, err)
require.NoError(t, discV5Node.Start())
time.Sleep(1 * time.Second)
discV5NodePeerId, err := discV5Node.node.PeerID()
discV5NodePeerId, err := discV5Node.PeerID()
require.NoError(t, err)
discv5NodeEnr, err := discV5Node.node.ENR()
discv5NodeEnr, err := discV5Node.ENR()
require.NoError(t, err)
// start node which serves as PeerExchange server
@ -223,7 +226,7 @@ func TestPeerExchange(t *testing.T) {
TcpPort: 60011,
}
pxServerNode, err := New(&pxServerWakuConfig, logger.Named("pxServerNode"))
pxServerNode, err := NewWakuNode(&pxServerWakuConfig, logger.Named("pxServerNode"))
require.NoError(t, err)
require.NoError(t, pxServerNode.Start())
@ -241,7 +244,7 @@ func TestPeerExchange(t *testing.T) {
// Check that pxServerNode has discV5Node in its Peer Store
err = RetryWithBackOff(func() error {
peers, err := pxServerNode.node.GetPeerIDsFromPeerStore()
peers, err := pxServerNode.GetPeerIDsFromPeerStore()
if err != nil {
return err
@ -268,18 +271,18 @@ func TestPeerExchange(t *testing.T) {
PeerExchangeNode: serverNodeMa[0].String(),
}
lightNode, err := New(&pxClientWakuConfig, logger.Named("lightNode"))
lightNode, err := NewWakuNode(&pxClientWakuConfig, logger.Named("lightNode"))
require.NoError(t, err)
require.NoError(t, lightNode.Start())
time.Sleep(1 * time.Second)
pxServerPeerId, err := pxServerNode.node.PeerID()
pxServerPeerId, err := pxServerNode.PeerID()
require.NoError(t, err)
// Check that the light node discovered the discV5Node and has both nodes in its peer store
err = RetryWithBackOff(func() error {
peers, err := lightNode.node.GetPeerIDsFromPeerStore()
peers, err := lightNode.GetPeerIDsFromPeerStore()
if err != nil {
return err
}
@ -293,7 +296,7 @@ func TestPeerExchange(t *testing.T) {
// Now perform the PX request manually to see if it also works
err = RetryWithBackOff(func() error {
numPeersReceived, err := lightNode.node.PeerExchangeRequest(1)
numPeersReceived, err := lightNode.PeerExchangeRequest(1)
if err != nil {
return err
}
@ -325,7 +328,8 @@ func TestDnsDiscover(t *testing.T) {
Discv5UdpPort: 9020,
TcpPort: 60020,
}
node, err := New(&nodeWakuConfig, logger.Named("node"))
node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node"))
require.NoError(t, err)
require.NoError(t, node.Start())
time.Sleep(1 * time.Second)
@ -333,7 +337,7 @@ func TestDnsDiscover(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout)
defer cancel()
res, err := node.node.DnsDiscovery(ctx, sampleEnrTree, nameserver)
res, err := node.DnsDiscovery(ctx, sampleEnrTree, nameserver)
require.NoError(t, err)
require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query")
// Stop nodes
@ -355,10 +359,9 @@ func TestDial(t *testing.T) {
TcpPort: 60030,
}
dialerNode, err := New(&dialerNodeWakuConfig, logger.Named("dialerNode"))
dialerNode, err := NewWakuNode(&dialerNodeWakuConfig, logger.Named("dialerNode"))
require.NoError(t, err)
require.NoError(t, dialerNode.Start())
time.Sleep(1 * time.Second)
// start node that will receive the dial
receiverNodeWakuConfig := WakuConfig{
@ -370,31 +373,31 @@ func TestDial(t *testing.T) {
Discv5UdpPort: 9031,
TcpPort: 60031,
}
receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
time.Sleep(1 * time.Second)
receiverMultiaddr, err := receiverNode.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, receiverMultiaddr)
// Check that both nodes start with no connected peers
dialerPeerCount, err := dialerNode.PeerCount()
dialerPeerCount, err := dialerNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers")
receiverPeerCount, err := receiverNode.PeerCount()
receiverPeerCount, err := receiverNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers")
// Dial
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = dialerNode.DialPeer(ctx, receiverMultiaddr[0])
err = dialerNode.Connect(ctx, receiverMultiaddr[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
dialerPeerCount, err = dialerNode.PeerCount()
dialerPeerCount, err = dialerNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer")
receiverPeerCount, err = receiverNode.PeerCount()
receiverPeerCount, err = receiverNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
// Stop nodes
@ -417,7 +420,7 @@ func TestRelay(t *testing.T) {
TcpPort: 60040,
}
senderNode, err := New(&senderNodeWakuConfig, logger.Named("senderNode"))
senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
require.NoError(t, err)
require.NoError(t, senderNode.Start())
time.Sleep(1 * time.Second)
@ -432,7 +435,7 @@ func TestRelay(t *testing.T) {
Discv5UdpPort: 9041,
TcpPort: 60041,
}
receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode"))
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
time.Sleep(1 * time.Second)
@ -441,16 +444,16 @@ func TestRelay(t *testing.T) {
require.NotNil(t, receiverMultiaddr)
// Dial so they become peers
ctx1, cancel1 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel1()
err = senderNode.DialPeer(ctx1, receiverMultiaddr[0])
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = senderNode.Connect(ctx, receiverMultiaddr[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
senderPeerCount, err := senderNode.PeerCount()
senderPeerCount, err := senderNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer")
receiverPeerCount, err := receiverNode.PeerCount()
receiverPeerCount, err := receiverNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
@ -468,7 +471,7 @@ func TestRelay(t *testing.T) {
// Wait to receive message
select {
case envelope := <-receiverNode.node.MsgChan:
case envelope := <-receiverNode.MsgChan:
require.NotNil(t, envelope, "Envelope should be received")
require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match")
require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match")
@ -498,7 +501,7 @@ func TestTopicHealth(t *testing.T) {
TcpPort: 60050,
}
node1, err := New(&wakuConfig1, logger.Named("node1"))
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
require.NoError(t, err)
require.NoError(t, node1.Start())
time.Sleep(1 * time.Second)
@ -513,7 +516,7 @@ func TestTopicHealth(t *testing.T) {
Discv5UdpPort: 9051,
TcpPort: 60051,
}
node2, err := New(&wakuConfig2, logger.Named("node2"))
node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2"))
require.NoError(t, err)
require.NoError(t, node2.Start())
time.Sleep(1 * time.Second)
@ -522,20 +525,22 @@ func TestTopicHealth(t *testing.T) {
require.NotNil(t, multiaddr2)
// node1 dials node2 so they become peers
err = node1.DialPeer(multiaddr2[0])
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = node1.Connect(ctx, multiaddr2[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
peerCount1, err := node1.PeerCount()
peerCount1, err := node1.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, peerCount1 == 1, "node1 should have 1 peer")
peerCount2, err := node2.PeerCount()
peerCount2, err := node2.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, peerCount2 == 1, "node2 should have 1 peer")
// Wait to receive topic health update
select {
case topicHealth := <-node2.node.TopicHealthChan:
case topicHealth := <-node2.TopicHealthChan:
require.NotNil(t, topicHealth, "topicHealth should be updated")
require.Equal(t, topicHealth.TopicHealth, "MinimallyHealthy", "Topic health should be MinimallyHealthy")
require.Equal(t, topicHealth.PubsubTopic, FormatWakuRelayTopic(clusterId, shardId), "PubsubTopic should match configured cluster and shard")