mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-01-28 02:13:12 +00:00
start removing Waku type
This commit is contained in:
parent
96fb32a76e
commit
386c84b859
106
waku/nwaku.go
106
waku/nwaku.go
@ -417,79 +417,6 @@ func (rl RateLimit) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(rl.String())
|
||||
}
|
||||
|
||||
// Waku represents a dark communication interface through the Ethereum
|
||||
// network, using its very own P2P communication layer.
|
||||
type Waku struct {
|
||||
node *WakuNode
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
wakuCfg *WakuConfig
|
||||
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// Start implements node.Service, starting the background data propagation thread
|
||||
// of the Waku protocol.
|
||||
func (w *Waku) Start() error {
|
||||
err := w.node.Start()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start nwaku node: %v", err)
|
||||
}
|
||||
|
||||
peerID, err := w.node.PeerID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.logger.Info("Waku PeerID", zap.Stringer("id", peerID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements node.Service, stopping the background data propagation thread
|
||||
// of the Waku protocol.
|
||||
func (w *Waku) Stop() error {
|
||||
w.cancel()
|
||||
|
||||
err := w.node.Stop()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.ctx = nil
|
||||
w.cancel = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Waku) PeerCount() (int, error) {
|
||||
return w.node.GetNumConnectedPeers()
|
||||
}
|
||||
|
||||
func (w *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) {
|
||||
return w.node.ListenAddresses()
|
||||
}
|
||||
|
||||
func (w *Waku) DialPeer(ctx context.Context, address multiaddr.Multiaddr) error {
|
||||
// Using WakuConnect so it matches the go-waku's behavior and terminology
|
||||
return w.node.Connect(ctx, address)
|
||||
}
|
||||
|
||||
// TODO: change pubsub topic to shard notation everywhere
|
||||
func (w *Waku) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
|
||||
return w.node.RelayPublish(ctx, message, pubsubTopic)
|
||||
}
|
||||
|
||||
func (w *Waku) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error {
|
||||
return w.node.DialPeerByID(ctx, peerID, protocol)
|
||||
}
|
||||
|
||||
func (w *Waku) DropPeer(peerID peer.ID) error {
|
||||
return w.node.DisconnectPeerByID(peerID)
|
||||
}
|
||||
|
||||
//export GoCallback
|
||||
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
if resp != nil {
|
||||
@ -505,6 +432,7 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
// WakuNode represents an instance of an nwaku node
|
||||
type WakuNode struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
wakuCfg *WakuConfig
|
||||
logger *zap.Logger
|
||||
cancel context.CancelFunc
|
||||
MsgChan chan common.Envelope
|
||||
@ -515,7 +443,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
n := &WakuNode{
|
||||
cancel: cancel,
|
||||
cancel: cancel,
|
||||
wakuCfg: config,
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
@ -564,34 +493,6 @@ func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// New creates a Waku client ready to communicate through the LibP2P network.
|
||||
func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
|
||||
var err error
|
||||
if logger == nil {
|
||||
logger, err = zap.NewDevelopment()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("starting Waku with config", zap.Any("nwakuCfg", nwakuCfg))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
wakunode, err := newWakuNode(ctx, nwakuCfg, logger)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Waku{
|
||||
node: wakunode,
|
||||
wakuCfg: nwakuCfg,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// The event callback sends back the node's ctx to know to which
|
||||
// node is the event being emited for. Since we only have a global
|
||||
// callback in the go side, We register all the nodes that we create
|
||||
@ -658,6 +559,7 @@ func (n *WakuNode) OnEvent(eventStr string) {
|
||||
}
|
||||
|
||||
func (n *WakuNode) parseMessageEvent(eventStr string) {
|
||||
fmt.Println("----------- got message event: ", eventStr)
|
||||
envelope, err := common.NewEnvelope(eventStr)
|
||||
if err != nil {
|
||||
n.logger.Error("could not parse message", zap.Error(err))
|
||||
|
||||
@ -50,11 +50,13 @@ func TestBasicWaku(t *testing.T) {
|
||||
storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
w, err := New(&nwakuConfig, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
w, err := newWakuNode(ctx, &nwakuConfig, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Start())
|
||||
|
||||
enr, err := w.node.ENR()
|
||||
enr, err := w.ENR()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, enr)
|
||||
|
||||
@ -64,7 +66,7 @@ func TestBasicWaku(t *testing.T) {
|
||||
|
||||
// Sanity check, not great, but it's probably helpful
|
||||
err = RetryWithBackOff(func() error {
|
||||
numConnected, err := w.node.GetNumConnectedPeers()
|
||||
numConnected, err := w.GetNumConnectedPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -87,28 +89,28 @@ func TestBasicWaku(t *testing.T) {
|
||||
*/
|
||||
|
||||
// Check that we are indeed connected to the store node
|
||||
connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
connectedStoreNodes, err := w.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
require.NoError(t, err)
|
||||
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
||||
|
||||
// Disconnect from the store node
|
||||
err = w.node.DisconnectPeerByID(storeNode.ID)
|
||||
err = w.DisconnectPeerByID(storeNode.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that we are indeed disconnected
|
||||
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
require.NoError(t, err)
|
||||
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
|
||||
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
|
||||
|
||||
// Re-connect
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
ctx2, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel()
|
||||
err = w.DialPeer(ctx, storeNodeMa)
|
||||
err = w.Connect(ctx2, storeNodeMa)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that we are connected again
|
||||
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
connectedStoreNodes, err = w.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
require.NoError(t, err)
|
||||
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
||||
|
||||
@ -198,16 +200,18 @@ func TestPeerExchange(t *testing.T) {
|
||||
TcpPort: 60010,
|
||||
}
|
||||
|
||||
discV5Node, err := New(&discV5NodeWakuConfig, logger.Named("discV5Node"))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
discV5Node, err := newWakuNode(ctx, &discV5NodeWakuConfig, logger.Named("discV5Node"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, discV5Node.Start())
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
discV5NodePeerId, err := discV5Node.node.PeerID()
|
||||
discV5NodePeerId, err := discV5Node.PeerID()
|
||||
require.NoError(t, err)
|
||||
|
||||
discv5NodeEnr, err := discV5Node.node.ENR()
|
||||
discv5NodeEnr, err := discV5Node.ENR()
|
||||
require.NoError(t, err)
|
||||
|
||||
// start node which serves as PeerExchange server
|
||||
@ -223,7 +227,9 @@ func TestPeerExchange(t *testing.T) {
|
||||
TcpPort: 60011,
|
||||
}
|
||||
|
||||
pxServerNode, err := New(&pxServerWakuConfig, logger.Named("pxServerNode"))
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
defer cancel2()
|
||||
pxServerNode, err := newWakuNode(ctx2, &pxServerWakuConfig, logger.Named("pxServerNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, pxServerNode.Start())
|
||||
|
||||
@ -241,7 +247,7 @@ func TestPeerExchange(t *testing.T) {
|
||||
|
||||
// Check that pxServerNode has discV5Node in its Peer Store
|
||||
err = RetryWithBackOff(func() error {
|
||||
peers, err := pxServerNode.node.GetPeerIDsFromPeerStore()
|
||||
peers, err := pxServerNode.GetPeerIDsFromPeerStore()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
@ -268,18 +274,20 @@ func TestPeerExchange(t *testing.T) {
|
||||
PeerExchangeNode: serverNodeMa[0].String(),
|
||||
}
|
||||
|
||||
lightNode, err := New(&pxClientWakuConfig, logger.Named("lightNode"))
|
||||
ctx3, cancel3 := context.WithCancel(context.Background())
|
||||
defer cancel3()
|
||||
lightNode, err := newWakuNode(ctx3, &pxClientWakuConfig, logger.Named("lightNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, lightNode.Start())
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
pxServerPeerId, err := pxServerNode.node.PeerID()
|
||||
pxServerPeerId, err := pxServerNode.PeerID()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that the light node discovered the discV5Node and has both nodes in its peer store
|
||||
err = RetryWithBackOff(func() error {
|
||||
peers, err := lightNode.node.GetPeerIDsFromPeerStore()
|
||||
peers, err := lightNode.GetPeerIDsFromPeerStore()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -293,7 +301,7 @@ func TestPeerExchange(t *testing.T) {
|
||||
|
||||
// Now perform the PX request manually to see if it also works
|
||||
err = RetryWithBackOff(func() error {
|
||||
numPeersReceived, err := lightNode.node.PeerExchangeRequest(1)
|
||||
numPeersReceived, err := lightNode.PeerExchangeRequest(1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -325,15 +333,17 @@ func TestDnsDiscover(t *testing.T) {
|
||||
Discv5UdpPort: 9020,
|
||||
TcpPort: 60020,
|
||||
}
|
||||
node, err := New(&nodeWakuConfig, logger.Named("node"))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
node, err := newWakuNode(ctx, &nodeWakuConfig, logger.Named("node"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout)
|
||||
defer cancel()
|
||||
res, err := node.node.DnsDiscovery(ctx, sampleEnrTree, nameserver)
|
||||
ctx2, cancel2 := context.WithTimeout(context.TODO(), requestTimeout)
|
||||
defer cancel2()
|
||||
res, err := node.DnsDiscovery(ctx2, sampleEnrTree, nameserver)
|
||||
require.NoError(t, err)
|
||||
require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query")
|
||||
// Stop nodes
|
||||
@ -355,7 +365,9 @@ func TestDial(t *testing.T) {
|
||||
TcpPort: 60030,
|
||||
}
|
||||
|
||||
dialerNode, err := New(&dialerNodeWakuConfig, logger.Named("dialerNode"))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
dialerNode, err := newWakuNode(ctx, &dialerNodeWakuConfig, logger.Named("dialerNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, dialerNode.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -370,7 +382,9 @@ func TestDial(t *testing.T) {
|
||||
Discv5UdpPort: 9031,
|
||||
TcpPort: 60031,
|
||||
}
|
||||
receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode"))
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
defer cancel2()
|
||||
receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, receiverNode.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -378,23 +392,23 @@ func TestDial(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, receiverMultiaddr)
|
||||
// Check that both nodes start with no connected peers
|
||||
dialerPeerCount, err := dialerNode.PeerCount()
|
||||
dialerPeerCount, err := dialerNode.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers")
|
||||
receiverPeerCount, err := receiverNode.PeerCount()
|
||||
receiverPeerCount, err := receiverNode.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers")
|
||||
// Dial
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel()
|
||||
err = dialerNode.DialPeer(ctx, receiverMultiaddr[0])
|
||||
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel3()
|
||||
err = dialerNode.Connect(ctx3, receiverMultiaddr[0])
|
||||
require.NoError(t, err)
|
||||
time.Sleep(1 * time.Second)
|
||||
// Check that both nodes now have one connected peer
|
||||
dialerPeerCount, err = dialerNode.PeerCount()
|
||||
dialerPeerCount, err = dialerNode.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer")
|
||||
receiverPeerCount, err = receiverNode.PeerCount()
|
||||
receiverPeerCount, err = receiverNode.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
|
||||
// Stop nodes
|
||||
@ -417,7 +431,9 @@ func TestRelay(t *testing.T) {
|
||||
TcpPort: 60040,
|
||||
}
|
||||
|
||||
senderNode, err := New(&senderNodeWakuConfig, logger.Named("senderNode"))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
senderNode, err := newWakuNode(ctx, &senderNodeWakuConfig, logger.Named("senderNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, senderNode.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -432,7 +448,9 @@ func TestRelay(t *testing.T) {
|
||||
Discv5UdpPort: 9041,
|
||||
TcpPort: 60041,
|
||||
}
|
||||
receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode"))
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
defer cancel2()
|
||||
receiverNode, err := newWakuNode(ctx2, &receiverNodeWakuConfig, logger.Named("receiverNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, receiverNode.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -441,16 +459,16 @@ func TestRelay(t *testing.T) {
|
||||
require.NotNil(t, receiverMultiaddr)
|
||||
|
||||
// Dial so they become peers
|
||||
ctx1, cancel1 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel1()
|
||||
err = senderNode.DialPeer(ctx1, receiverMultiaddr[0])
|
||||
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel3()
|
||||
err = senderNode.Connect(ctx3, receiverMultiaddr[0])
|
||||
require.NoError(t, err)
|
||||
time.Sleep(1 * time.Second)
|
||||
// Check that both nodes now have one connected peer
|
||||
senderPeerCount, err := senderNode.PeerCount()
|
||||
senderPeerCount, err := senderNode.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer")
|
||||
receiverPeerCount, err := receiverNode.PeerCount()
|
||||
receiverPeerCount, err := receiverNode.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
|
||||
|
||||
@ -462,13 +480,13 @@ func TestRelay(t *testing.T) {
|
||||
}
|
||||
// send message
|
||||
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel2()
|
||||
senderNode.RelayPublish(ctx2, message, pubsubTopic)
|
||||
ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel4()
|
||||
senderNode.RelayPublish(ctx4, message, pubsubTopic)
|
||||
|
||||
// Wait to receive message
|
||||
select {
|
||||
case envelope := <-receiverNode.node.MsgChan:
|
||||
case envelope := <-receiverNode.MsgChan:
|
||||
require.NotNil(t, envelope, "Envelope should be received")
|
||||
require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match")
|
||||
require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match")
|
||||
@ -498,7 +516,9 @@ func TestTopicHealth(t *testing.T) {
|
||||
TcpPort: 60050,
|
||||
}
|
||||
|
||||
node1, err := New(&wakuConfig1, logger.Named("node1"))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
node1, err := newWakuNode(ctx, &wakuConfig1, logger.Named("node1"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node1.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -513,7 +533,9 @@ func TestTopicHealth(t *testing.T) {
|
||||
Discv5UdpPort: 9051,
|
||||
TcpPort: 60051,
|
||||
}
|
||||
node2, err := New(&wakuConfig2, logger.Named("node2"))
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
defer cancel2()
|
||||
node2, err := newWakuNode(ctx2, &wakuConfig2, logger.Named("node2"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node2.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -522,20 +544,22 @@ func TestTopicHealth(t *testing.T) {
|
||||
require.NotNil(t, multiaddr2)
|
||||
|
||||
// node1 dials node2 so they become peers
|
||||
err = node1.DialPeer(multiaddr2[0])
|
||||
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel3()
|
||||
err = node1.Connect(ctx3, multiaddr2[0])
|
||||
require.NoError(t, err)
|
||||
time.Sleep(1 * time.Second)
|
||||
// Check that both nodes now have one connected peer
|
||||
peerCount1, err := node1.PeerCount()
|
||||
peerCount1, err := node1.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, peerCount1 == 1, "node1 should have 1 peer")
|
||||
peerCount2, err := node2.PeerCount()
|
||||
peerCount2, err := node2.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, peerCount2 == 1, "node2 should have 1 peer")
|
||||
|
||||
// Wait to receive topic health update
|
||||
select {
|
||||
case topicHealth := <-node2.node.TopicHealthChan:
|
||||
case topicHealth := <-node2.TopicHealthChan:
|
||||
require.NotNil(t, topicHealth, "topicHealth should be updated")
|
||||
require.Equal(t, topicHealth.TopicHealth, "MinimallyHealthy", "Topic health should be MinimallyHealthy")
|
||||
require.Equal(t, topicHealth.PubsubTopic, FormatWakuRelayTopic(clusterId, shardId), "PubsubTopic should match configured cluster and shard")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user