mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-01-08 00:43:09 +00:00
feat: integrating connection change event (#14)
This commit is contained in:
parent
b7368a9e73
commit
477a11ff80
@ -340,6 +340,7 @@ import (
|
||||
const requestTimeout = 30 * time.Second
|
||||
const MsgChanBufferSize = 100
|
||||
const TopicHealthChanBufferSize = 100
|
||||
const ConnectionChangeChanBufferSize = 100
|
||||
|
||||
type WakuConfig struct {
|
||||
Host string `json:"host,omitempty"`
|
||||
@ -435,6 +436,7 @@ type WakuNode struct {
|
||||
logger *zap.Logger
|
||||
MsgChan chan common.Envelope
|
||||
TopicHealthChan chan topicHealth
|
||||
ConnectionChangeChan chan connectionChange
|
||||
}
|
||||
|
||||
func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
|
||||
@ -469,6 +471,7 @@ func NewWakuNode(config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
|
||||
|
||||
n.MsgChan = make(chan common.Envelope, MsgChanBufferSize)
|
||||
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
|
||||
n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize)
|
||||
|
||||
// Notice that the events for self node are handled by the 'MyEventCallback' method
|
||||
C.cGoWakuSetEventCallback(n.wakuCtx)
|
||||
@ -526,6 +529,11 @@ type topicHealth struct {
|
||||
TopicHealth string `json:"topicHealth"`
|
||||
}
|
||||
|
||||
type connectionChange struct {
|
||||
PeerId peer.ID `json:"peerId"`
|
||||
PeerEvent string `json:"peerEvent"`
|
||||
}
|
||||
|
||||
func (n *WakuNode) OnEvent(eventStr string) {
|
||||
jsonEvent := jsonEvent{}
|
||||
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
|
||||
@ -539,6 +547,8 @@ func (n *WakuNode) OnEvent(eventStr string) {
|
||||
n.parseMessageEvent(eventStr)
|
||||
case "relay_topic_health_change":
|
||||
n.parseTopicHealthChangeEvent(eventStr)
|
||||
case "connection_change":
|
||||
n.parseConnectionChangeEvent(eventStr)
|
||||
}
|
||||
}
|
||||
|
||||
@ -560,6 +570,16 @@ func (n *WakuNode) parseTopicHealthChangeEvent(eventStr string) {
|
||||
n.TopicHealthChan <- topicHealth
|
||||
}
|
||||
|
||||
func (n *WakuNode) parseConnectionChangeEvent(eventStr string) {
|
||||
|
||||
connectionChange := connectionChange{}
|
||||
err := json.Unmarshal([]byte(eventStr), &connectionChange)
|
||||
if err != nil {
|
||||
n.logger.Error("could not parse connection change", zap.Error(err))
|
||||
}
|
||||
n.ConnectionChangeChan <- connectionChange
|
||||
}
|
||||
|
||||
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
|
||||
var pubsubTopic string
|
||||
if len(optPubsubTopic) == 0 {
|
||||
|
||||
@ -205,8 +205,6 @@ func TestPeerExchange(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, discV5Node.Start())
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
discV5NodePeerId, err := discV5Node.PeerID()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -236,6 +234,7 @@ func TestPeerExchange(t *testing.T) {
|
||||
serverNodeMa, err := pxServerNode.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, serverNodeMa)
|
||||
require.True(t, len(serverNodeMa) > 0)
|
||||
|
||||
// Sanity check, not great, but it's probably helpful
|
||||
options := func(b *backoff.ExponentialBackOff) {
|
||||
@ -275,8 +274,6 @@ func TestPeerExchange(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, lightNode.Start())
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
pxServerPeerId, err := pxServerNode.PeerID()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -332,7 +329,6 @@ func TestDnsDiscover(t *testing.T) {
|
||||
node, err := NewWakuNode(&nodeWakuConfig, logger.Named("node"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout)
|
||||
@ -380,6 +376,7 @@ func TestDial(t *testing.T) {
|
||||
receiverMultiaddr, err := receiverNode.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, receiverMultiaddr)
|
||||
require.True(t, len(receiverMultiaddr) > 0)
|
||||
// Check that both nodes start with no connected peers
|
||||
dialerPeerCount, err := dialerNode.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
@ -423,7 +420,6 @@ func TestRelay(t *testing.T) {
|
||||
senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, senderNode.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// start node that will receive the message
|
||||
receiverNodeWakuConfig := WakuConfig{
|
||||
@ -438,10 +434,10 @@ func TestRelay(t *testing.T) {
|
||||
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, receiverNode.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
receiverMultiaddr, err := receiverNode.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, receiverMultiaddr)
|
||||
require.True(t, len(receiverMultiaddr) > 0)
|
||||
|
||||
// Dial so they become peers
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
@ -504,7 +500,6 @@ func TestTopicHealth(t *testing.T) {
|
||||
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node1.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// start node2
|
||||
wakuConfig2 := WakuConfig{
|
||||
@ -519,10 +514,10 @@ func TestTopicHealth(t *testing.T) {
|
||||
node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node2.Start())
|
||||
time.Sleep(1 * time.Second)
|
||||
multiaddr2, err := node2.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, multiaddr2)
|
||||
require.True(t, len(multiaddr2) > 0)
|
||||
|
||||
// node1 dials node2 so they become peers
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
@ -553,3 +548,88 @@ func TestTopicHealth(t *testing.T) {
|
||||
require.NoError(t, node2.Stop())
|
||||
|
||||
}
|
||||
|
||||
func TestConnectionChange(t *testing.T) {
|
||||
logger, err := zap.NewDevelopment()
|
||||
require.NoError(t, err)
|
||||
clusterId := uint16(16)
|
||||
shardId := uint16(64)
|
||||
|
||||
// start node1
|
||||
wakuConfig1 := WakuConfig{
|
||||
Relay: true,
|
||||
LogLevel: "DEBUG",
|
||||
Discv5Discovery: false,
|
||||
ClusterID: clusterId,
|
||||
Shards: []uint16{shardId},
|
||||
Discv5UdpPort: 9060,
|
||||
TcpPort: 60060,
|
||||
}
|
||||
|
||||
node1, err := NewWakuNode(&wakuConfig1, logger.Named("node1"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node1.Start())
|
||||
|
||||
// start node2
|
||||
wakuConfig2 := WakuConfig{
|
||||
Relay: true,
|
||||
LogLevel: "DEBUG",
|
||||
Discv5Discovery: false,
|
||||
ClusterID: clusterId,
|
||||
Shards: []uint16{shardId},
|
||||
Discv5UdpPort: 9061,
|
||||
TcpPort: 60061,
|
||||
}
|
||||
node2, err := NewWakuNode(&wakuConfig2, logger.Named("node2"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, node2.Start())
|
||||
multiaddr2, err := node2.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, multiaddr2)
|
||||
require.True(t, len(multiaddr2) > 0)
|
||||
|
||||
// node1 dials node2 so they become peers
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel()
|
||||
err = node1.Connect(ctx, multiaddr2[0])
|
||||
require.NoError(t, err)
|
||||
time.Sleep(1 * time.Second)
|
||||
// Check that both nodes now have one connected peer
|
||||
peerCount1, err := node1.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, peerCount1 == 1, "node1 should have 1 peer")
|
||||
peerCount2, err := node2.GetNumConnectedPeers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, peerCount2 == 1, "node2 should have 1 peer")
|
||||
|
||||
peerId1, err := node1.PeerID()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait to receive connectionChange event
|
||||
select {
|
||||
case connectionChange := <-node2.ConnectionChangeChan:
|
||||
require.NotNil(t, connectionChange, "connectionChange should be updated")
|
||||
require.Equal(t, connectionChange.PeerEvent, "Joined", "connectionChange Joined event should be emitted")
|
||||
require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId")
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout: No connectionChange event received within 10 seconds")
|
||||
}
|
||||
|
||||
// Disconnect from node1
|
||||
err = node2.DisconnectPeerByID(peerId1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait to receive connectionChange event
|
||||
select {
|
||||
case connectionChange := <-node2.ConnectionChangeChan:
|
||||
require.NotNil(t, connectionChange, "connectionChange should be updated")
|
||||
require.Equal(t, connectionChange.PeerEvent, "Left", "connectionChange Left event should be emitted")
|
||||
require.Equal(t, connectionChange.PeerId, peerId1, "connectionChange event should contain node 1's peerId")
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout: No connectionChange event received within 10 seconds")
|
||||
}
|
||||
|
||||
// Stop nodes
|
||||
require.NoError(t, node1.Stop())
|
||||
require.NoError(t, node2.Stop())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user