integrating connectionChange event

This commit is contained in:
Gabriel mermelstein 2025-01-08 11:55:04 +01:00
parent b7368a9e73
commit 6915bc6a5c
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
2 changed files with 111 additions and 5 deletions

View File

@ -340,6 +340,7 @@ import (
const requestTimeout = 30 * time.Second
const MsgChanBufferSize = 100
const TopicHealthChanBufferSize = 100
const ConnectionChangeChanBufferSize = 100
type WakuConfig struct {
Host string `json:"host,omitempty"`
@ -430,11 +431,12 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
// WakuNode represents an instance of an nwaku node
type WakuNode struct {
wakuCtx unsafe.Pointer
config *WakuConfig
logger *zap.Logger
MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
wakuCtx unsafe.Pointer
config *WakuConfig
logger *zap.Logger
MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
ConnectionChangeChan chan connectionChange
}
func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
@ -469,6 +471,7 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
n.MsgChan = make(chan common.Envelope, MsgChanBufferSize)
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize)
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)
@ -526,6 +529,11 @@ type topicHealth struct {
TopicHealth string `json:"topicHealth"`
}
type connectionChange struct {
PeerId peer.ID `json:"peerId"`
PeerEvent string `json:"peerEvent"`
}
func (n *WakuNode) OnEvent(eventStr string) {
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
@ -539,6 +547,8 @@ func (n *WakuNode) OnEvent(eventStr string) {
n.parseMessageEvent(eventStr)
case "relay_topic_health_change":
n.parseTopicHealthChangeEvent(eventStr)
case "connection_change":
n.parseConnectionChangeEvent(eventStr)
}
}
@ -560,6 +570,16 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) {
n.TopicHealthChan <- topicHealth
}
func (n *WakuNode) parseConnectionChangeEvent(eventStr string) {
connectionChange := connectionChange{}
err := json.Unmarshal([]byte(eventStr), &connectionChange)
if err != nil {
n.logger.Error("could not parse connection change", zap.Error(err))
}
n.ConnectionChangeChan <- connectionChange
}
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(optPubsubTopic) == 0 {

View File

@ -553,3 +553,89 @@ func TestTopicHealth(t *testing.T) {
require.NoError(t, node2.Stop())
}
func TestConnectionChange(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
clusterId := uint16(16)
shardId := uint16(64)
// start node1
wakuConfig1 := WakuConfig{
Relay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: clusterId,
Shards: []uint16{shardId},
Discv5UdpPort: 9060,
TcpPort: 60060,
}
node1, err := New(&wakuConfig1, logger.Named("node1"))
require.NoError(t, err)
require.NoError(t, node1.Start())
time.Sleep(1 * time.Second)
// start node2
wakuConfig2 := WakuConfig{
Relay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: clusterId,
Shards: []uint16{shardId},
Discv5UdpPort: 9061,
TcpPort: 60061,
}
node2, err := New(&wakuConfig2, logger.Named("node2"))
require.NoError(t, err)
require.NoError(t, node2.Start())
time.Sleep(1 * time.Second)
multiaddr2, err := node2.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, multiaddr2)
// node1 dials node2 so they become peers
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = node1.DialPeer(ctx, multiaddr2[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
peerCount1, err := node1.PeerCount()
require.NoError(t, err)
require.True(t, peerCount1 == 1, "node1 should have 1 peer")
peerCount2, err := node2.PeerCount()
require.NoError(t, err)
require.True(t, peerCount2 == 1, "node2 should have 1 peer")
peerId1, err := node1.node.PeerID()
require.NoError(t, err)
// Wait to receive connectionChange event
select {
case connectionChange := <-node2.node.ConnectionChangeChan:
require.NotNil(t, connectionChange, "connectionChange should be updated")
require.Equal(t, connectionChange.PeerEvent, "Joined", "connectionChange Joined event should be emitted")
require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId")
case <-time.After(10 * time.Second):
t.Fatal("Timeout: No connectionChange event received within 10 seconds")
}
// Disconnect from node1
err = node2.node.DisconnectPeerByID(peerId1)
require.NoError(t, err)
// Wait to receive connectionChange event
select {
case connectionChange := <-node2.node.ConnectionChangeChan:
require.NotNil(t, connectionChange, "connectionChange should be updated")
require.Equal(t, connectionChange.PeerEvent, "Left", "connectionChange Left event should be emitted")
require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId")
case <-time.After(10 * time.Second):
t.Fatal("Timeout: No connectionChange event received within 10 seconds")
}
// Stop nodes
require.NoError(t, node1.Stop())
require.NoError(t, node2.Stop())
}