From 818ccb265aff27fd41684de7db644ebed0387740 Mon Sep 17 00:00:00 2001 From: aya Date: Tue, 25 Mar 2025 17:26:59 +0200 Subject: [PATCH] Add new test & reduce iteratons in highthorughput test --- waku/stress_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/waku/stress_test.go b/waku/stress_test.go index 52951af..31f4111 100644 --- a/waku/stress_test.go +++ b/waku/stress_test.go @@ -5,6 +5,7 @@ package waku import ( "fmt" + "math/rand" "os" "runtime" "testing" @@ -160,7 +161,7 @@ func TestStressHighThroughput10kPublish(t *testing.T) { Debug("Memory usage BEFORE sending => HeapAlloc: %d KB, RSS: %d KB", startHeapKB, startRSSKB) - totalMessages := 10000 + totalMessages := 1000 pubsubTopic := DefaultPubsubTopic startTime := time.Now() @@ -242,6 +243,93 @@ func TestStressConnectDisconnect500Iteration(t *testing.T) { Debug("After test: HeapAlloc = %d KB, RSS = %d KB", endHeapKB, endRSSKB) } +func TestMultiNodeRelayMeshHighChurnConnectAllNoMessages(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + minNodes := 5 + maxNodes := 10 + nodes := make([]*WakuNode, 0, maxNodes) + + for i := 0; i < minNodes; i++ { + cfg := DefaultWakuConfig + cfg.Relay = true + n, err := StartWakuNode(fmt.Sprintf("node%d", i+1), &cfg) + require.NoError(t, err, "Failed to start initial node %d", i+1) + nodes = append(nodes, n) + } + + err := ConnectAllPeers(nodes) + require.NoError(t, err, "Failed to connect initial nodes with ConnectAllPeers") + + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + startHeapKB := memStats.HeapAlloc / 1024 + startRSSKB, err2 := utils.GetRSSKB() + require.NoError(t, err2, "Failed to read initial RSS") + Debug("Before high-churn mesh test: HeapAlloc=%d KB, RSS=%d KB", startHeapKB, startRSSKB) + + testDuration := 20 * time.Minute + endTime := time.Now().Add(testDuration) + + for time.Now().Before(endTime) { + action := rand.Intn(2) + + if action == 0 && len(nodes) < maxNodes { + i := len(nodes) + cfg := DefaultWakuConfig + cfg.Relay = true + newNode, err := StartWakuNode(fmt.Sprintf("node%d", i+1), &cfg) + if err == nil { + nodes = append(nodes, newNode) + err := ConnectAllPeers(nodes) + if err == nil { + Debug("Added node%d, now connecting all peers", i+1) + } else { + Debug("Failed to reconnect all peers after adding node%d: %v", i+1, errConn) + } + } else { + Debug("Failed to start new node: %v", err) + } + } else if action == 1 && len(nodes) > minNodes { + removeIndex := rand.Intn(len(nodes)) + toRemove := nodes[removeIndex] + nodes = append(nodes[:removeIndex], nodes[removeIndex+1:]...) + toRemove.StopAndDestroy() + Debug("Removed node %d from mesh", removeIndex) + if len(nodes) > 1 { + errConn := ConnectAllPeers(nodes) + if errConn == nil { + Debug("Reconnected all peers node %d", removeIndex) + } else { + Debug("Failed to reconnect all peers when removing node %d: %v", removeIndex, errConn) + } + } + } + + time.Sleep(5 * time.Second) + + for j, n := range nodes { + count, err := n.GetNumConnectedPeers() + if err != nil { + Debug("Node%d: error getting connected peers: %v", j+1, err) + } else { + Debug("Node%d sees %d connected peers", j+1, count) + } + } + + time.Sleep(3 * time.Second) + } + + for _, n := range nodes { + n.StopAndDestroy() + } + + runtime.ReadMemStats(&memStats) + endHeapKB := memStats.HeapAlloc / 1024 + endRSSKB, err3 := utils.GetRSSKB() + require.NoError(t, err3, "Failed to read final RSS") + Debug("After high-churn mesh test: HeapAlloc=%d KB, RSS=%d KB", endHeapKB, endRSSKB) +} + func TestStressLargePayloadEphemeralMessagesEndurance(t *testing.T) { nodePubCfg := DefaultWakuConfig nodePubCfg.Relay = true