From 477a11ff80bf239d391d029f2d93e5bd258cd628 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:08:31 +0100 Subject: [PATCH] feat: integrating connection change event (#14) --- waku/nwaku.go | 30 +++++++++++--- waku/nwaku_test.go | 98 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 114 insertions(+), 14 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index dc77de5..689bd92 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -340,6 +340,7 @@ import ( const requestTimeout = 30 * time.Second const MsgChanBufferSize = 100 const TopicHealthChanBufferSize = 100 +const ConnectionChangeChanBufferSize = 100 type WakuConfig struct { Host string `json:"host,omitempty"` @@ -430,11 +431,12 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { - wakuCtx unsafe.Pointer - config *WakuConfig - logger *zap.Logger - MsgChan chan common.Envelope - TopicHealthChan chan topicHealth + wakuCtx unsafe.Pointer + config *WakuConfig + logger *zap.Logger + MsgChan chan common.Envelope + TopicHealthChan chan topicHealth + ConnectionChangeChan chan connectionChange } func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { @@ -469,6 +471,7 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) + n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize) // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoWakuSetEventCallback(n.wakuCtx) @@ -526,6 +529,11 @@ type topicHealth struct { TopicHealth string `json:"topicHealth"` } +type connectionChange struct { + PeerId peer.ID `json:"peerId"` + PeerEvent string `json:"peerEvent"` +} + func (n *WakuNode) OnEvent(eventStr string) { jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) @@ -539,6 +547,8 @@ func (n *WakuNode) OnEvent(eventStr string) { n.parseMessageEvent(eventStr) case "relay_topic_health_change": n.parseTopicHealthChangeEvent(eventStr) + case "connection_change": + n.parseConnectionChangeEvent(eventStr) } } @@ -560,6 +570,16 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { n.TopicHealthChan <- topicHealth } +func (n *WakuNode) parseConnectionChangeEvent(eventStr string) { + + connectionChange := connectionChange{} + err := json.Unmarshal([]byte(eventStr), &connectionChange) + if err != nil { + n.logger.Error("could not parse connection change", zap.Error(err)) + } + n.ConnectionChangeChan <- connectionChange +} + func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index d0975e1..09db846 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -205,8 +205,6 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) require.NoError(t, discV5Node.Start()) - time.Sleep(1 * time.Second) - discV5NodePeerId, err := discV5Node.PeerID() require.NoError(t, err) @@ -236,6 +234,7 @@ func TestPeerExchange(t *testing.T) { serverNodeMa, err := pxServerNode.ListenAddresses() require.NoError(t, err) require.NotNil(t, serverNodeMa) + require.True(t, len(serverNodeMa) > 0) // Sanity check, not great, but it's probably helpful options := func(b *backoff.ExponentialBackOff) { @@ -275,8 +274,6 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) require.NoError(t, lightNode.Start()) - time.Sleep(1 * time.Second) - pxServerPeerId, err := pxServerNode.PeerID() require.NoError(t, err) @@ -332,7 +329,6 @@ func TestDnsDiscover(t *testing.T) { node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node")) require.NoError(t, err) require.NoError(t, node.Start()) - time.Sleep(1 * time.Second) sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im" ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) @@ -380,6 +376,7 @@ func TestDial(t *testing.T) { receiverMultiaddr, err := receiverNode.ListenAddresses() require.NoError(t, err) require.NotNil(t, receiverMultiaddr) + require.True(t, len(receiverMultiaddr) > 0) // Check that both nodes start with no connected peers dialerPeerCount, err := dialerNode.GetNumConnectedPeers() require.NoError(t, err) @@ -423,7 +420,6 @@ func TestRelay(t *testing.T) { senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) require.NoError(t, err) require.NoError(t, senderNode.Start()) - time.Sleep(1 * time.Second) // start node that will receive the message receiverNodeWakuConfig := WakuConfig{ @@ -438,10 +434,10 @@ func TestRelay(t *testing.T) { receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) require.NoError(t, receiverNode.Start()) - time.Sleep(1 * time.Second) receiverMultiaddr, err := receiverNode.ListenAddresses() require.NoError(t, err) require.NotNil(t, receiverMultiaddr) + require.True(t, len(receiverMultiaddr) > 0) // Dial so they become peers ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -504,7 +500,6 @@ func TestTopicHealth(t *testing.T) { node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) require.NoError(t, err) require.NoError(t, node1.Start()) - time.Sleep(1 * time.Second) // start node2 wakuConfig2 := WakuConfig{ @@ -519,10 +514,10 @@ func TestTopicHealth(t *testing.T) { node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2")) require.NoError(t, err) require.NoError(t, node2.Start()) - time.Sleep(1 * time.Second) multiaddr2, err := node2.ListenAddresses() require.NoError(t, err) require.NotNil(t, multiaddr2) + require.True(t, len(multiaddr2) > 0) // node1 dials node2 so they become peers ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -553,3 +548,88 @@ func TestTopicHealth(t *testing.T) { require.NoError(t, node2.Stop()) } + +func TestConnectionChange(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + clusterId := uint16(16) + shardId := uint16(64) + + // start node1 + wakuConfig1 := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: clusterId, + Shards: []uint16{shardId}, + Discv5UdpPort: 9060, + TcpPort: 60060, + } + + node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1")) + require.NoError(t, err) + require.NoError(t, node1.Start()) + + // start node2 + wakuConfig2 := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: clusterId, + Shards: []uint16{shardId}, + Discv5UdpPort: 9061, + TcpPort: 60061, + } + node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2")) + require.NoError(t, err) + require.NoError(t, node2.Start()) + multiaddr2, err := node2.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, multiaddr2) + require.True(t, len(multiaddr2) > 0) + + // node1 dials node2 so they become peers + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = node1.Connect(ctx, multiaddr2[0]) + require.NoError(t, err) + time.Sleep(1 * time.Second) + // Check that both nodes now have one connected peer + peerCount1, err := node1.GetNumConnectedPeers() + require.NoError(t, err) + require.True(t, peerCount1 == 1, "node1 should have 1 peer") + peerCount2, err := node2.GetNumConnectedPeers() + require.NoError(t, err) + require.True(t, peerCount2 == 1, "node2 should have 1 peer") + + peerId1, err := node1.PeerID() + require.NoError(t, err) + + // Wait to receive connectionChange event + select { + case connectionChange := <-node2.ConnectionChangeChan: + require.NotNil(t, connectionChange, "connectionChange should be updated") + require.Equal(t, connectionChange.PeerEvent, "Joined", "connectionChange Joined event should be emitted") + require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId") + case <-time.After(10 * time.Second): + t.Fatal("Timeout: No connectionChange event received within 10 seconds") + } + + // Disconnect from node1 + err = node2.DisconnectPeerByID(peerId1) + require.NoError(t, err) + + // Wait to receive connectionChange event + select { + case connectionChange := <-node2.ConnectionChangeChan: + require.NotNil(t, connectionChange, "connectionChange should be updated") + require.Equal(t, connectionChange.PeerEvent, "Left", "connectionChange Left event should be emitted") + require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId") + case <-time.After(10 * time.Second): + t.Fatal("Timeout: No connectionChange event received within 10 seconds") + } + + // Stop nodes + require.NoError(t, node1.Stop()) + require.NoError(t, node2.Stop()) +}