diff --git a/waku/nwaku.go b/waku/nwaku.go index dc77de5..689bd92 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -340,6 +340,7 @@ import ( const requestTimeout = 30 * time.Second const MsgChanBufferSize = 100 const TopicHealthChanBufferSize = 100 +const ConnectionChangeChanBufferSize = 100 type WakuConfig struct { Host string `json:"host,omitempty"` @@ -430,11 +431,12 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { - wakuCtx unsafe.Pointer - config *WakuConfig - logger *zap.Logger - MsgChan chan common.Envelope - TopicHealthChan chan topicHealth + wakuCtx unsafe.Pointer + config *WakuConfig + logger *zap.Logger + MsgChan chan common.Envelope + TopicHealthChan chan topicHealth + ConnectionChangeChan chan connectionChange } func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { @@ -469,6 +471,7 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) + n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize) // Notice that the events for self node are handled by the 'MyEventCallback' method C.cGoWakuSetEventCallback(n.wakuCtx) @@ -526,6 +529,11 @@ type topicHealth struct { TopicHealth string `json:"topicHealth"` } +type connectionChange struct { + PeerId peer.ID `json:"peerId"` + PeerEvent string `json:"peerEvent"` +} + func (n *WakuNode) OnEvent(eventStr string) { jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) @@ -539,6 +547,8 @@ func (n *WakuNode) OnEvent(eventStr string) { n.parseMessageEvent(eventStr) case "relay_topic_health_change": n.parseTopicHealthChangeEvent(eventStr) + case "connection_change": + n.parseConnectionChangeEvent(eventStr) } } @@ -560,6 +570,16 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) { n.TopicHealthChan <- topicHealth } +func (n *WakuNode) parseConnectionChangeEvent(eventStr string) { + + connectionChange := connectionChange{} + err := json.Unmarshal([]byte(eventStr), &connectionChange) + if err != nil { + n.logger.Error("could not parse connection change", zap.Error(err)) + } + n.ConnectionChangeChan <- connectionChange +} + func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index d0975e1..65e7c9b 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -553,3 +553,89 @@ func TestTopicHealth(t *testing.T) { require.NoError(t, node2.Stop()) } + +func TestConnectionChange(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + clusterId := uint16(16) + shardId := uint16(64) + + // start node1 + wakuConfig1 := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: clusterId, + Shards: []uint16{shardId}, + Discv5UdpPort: 9060, + TcpPort: 60060, + } + + node1, err := New(&wakuConfig1, logger.Named("node1")) + require.NoError(t, err) + require.NoError(t, node1.Start()) + time.Sleep(1 * time.Second) + + // start node2 + wakuConfig2 := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: clusterId, + Shards: []uint16{shardId}, + Discv5UdpPort: 9061, + TcpPort: 60061, + } + node2, err := New(&wakuConfig2, logger.Named("node2")) + require.NoError(t, err) + require.NoError(t, node2.Start()) + time.Sleep(1 * time.Second) + multiaddr2, err := node2.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, multiaddr2) + + // node1 dials node2 so they become peers + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = node1.DialPeer(ctx, multiaddr2[0]) + require.NoError(t, err) + time.Sleep(1 * time.Second) + // Check that both nodes now have one connected peer + peerCount1, err := node1.PeerCount() + require.NoError(t, err) + require.True(t, peerCount1 == 1, "node1 should have 1 peer") + peerCount2, err := node2.PeerCount() + require.NoError(t, err) + require.True(t, peerCount2 == 1, "node2 should have 1 peer") + + peerId1, err := node1.node.PeerID() + require.NoError(t, err) + + // Wait to receive connectionChange event + select { + case connectionChange := <-node2.node.ConnectionChangeChan: + require.NotNil(t, connectionChange, "connectionChange should be updated") + require.Equal(t, connectionChange.PeerEvent, "Joined", "connectionChange Joined event should be emitted") + require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId") + case <-time.After(10 * time.Second): + t.Fatal("Timeout: No connectionChange event received within 10 seconds") + } + + // Disconnect from node1 + err = node2.node.DisconnectPeerByID(peerId1) + require.NoError(t, err) + + // Wait to receive connectionChange event + select { + case connectionChange := <-node2.node.ConnectionChangeChan: + require.NotNil(t, connectionChange, "connectionChange should be updated") + require.Equal(t, connectionChange.PeerEvent, "Left", "connectionChange Left event should be emitted") + require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId") + case <-time.After(10 * time.Second): + t.Fatal("Timeout: No connectionChange event received within 10 seconds") + } + + // Stop nodes + require.NoError(t, node1.Stop()) + require.NoError(t, node2.Stop()) +}