From 18e887d94c8f5d8ef8a537bf59935c4de82507cb Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 26 Mar 2025 12:41:36 +0200 Subject: [PATCH] split CI jobs to speed it up --- .github/workflows/CI.yml | 2 +- .github/workflows/CI_endurance.yml | 88 +++++++++++++++++++ ...test.yml => Repeated_tests_endurancce.yml} | 0 waku/nwaku_test_utils.go | 37 ++++++++ waku/stress_test.go | 88 +++++++++++++++++-- 5 files changed, 208 insertions(+), 7 deletions(-) create mode 100644 .github/workflows/CI_endurance.yml rename .github/workflows/{endurancce_test.yml => Repeated_tests_endurancce.yml} (100%) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 8dc5749..12a93d3 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -38,7 +38,7 @@ jobs: - name: Run Endurance Test run: | - go test -p=1 -v ./waku -count=1 -timeout=360m -run '^TestStress' | tee testlogs.log + go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(?!TestStress).*' | tee testlogs.log - name: Upload Test Logs uses: actions/upload-artifact@v4 diff --git a/.github/workflows/CI_endurance.yml b/.github/workflows/CI_endurance.yml new file mode 100644 index 0000000..a72d8c3 --- /dev/null +++ b/.github/workflows/CI_endurance.yml @@ -0,0 +1,88 @@ +name: Endurance Tests + +on: + push: + branches: [ "stress_test" ] + +jobs: + endurance1: + runs-on: ubuntu-latest + + steps: + - name: Check out repository + uses: actions/checkout@v3 + with: + submodules: true + + - name: Initialize & update submodules + run: git submodule update --init --recursive + + - name: Prepare third_party directory + run: | + sudo mkdir -p third_party + sudo chown $USER third_party + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: "1.21" + + - name: Install Go dependencies + run: go mod download + + - name: Build nwaku dependencies + run: make -C waku + + - name: Increase ulimit + run: sudo sh -c "ulimit -n 8192" + + - name: Run Endurance Test (Group 1) + run: | + go test -p=1 ./waku -count=1 -timeout=360m -run '^(TestStressMemoryUsageForThreeNodes|TestStressStoreQuery5kMessagesWithPagination|TestStressHighThroughput10kPublish|TestStressConnectDisconnect500Iteration)$' | tee testlogs1.log + + - name: Upload Test Logs (Group 1) + uses: actions/upload-artifact@v4 + with: + name: endurance-logs-group1 + path: testlogs1.log + + endurance2: + runs-on: ubuntu-latest + + steps: + - name: Check out repository + uses: actions/checkout@v3 + with: + submodules: true + + - name: Initialize & update submodules + run: git submodule update --init --recursive + + - name: Prepare third_party directory + run: | + sudo mkdir -p third_party + sudo chown $USER third_party + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: "1.21" + + - name: Install Go dependencies + run: go mod download + + - name: Build nwaku dependencies + run: make -C waku + + - name: Increase ulimit + run: sudo sh -c "ulimit -n 8192" + + - name: Run Endurance Test (Group 2) + run: | + go test -p=1 ./waku -count=1 -timeout=360m -run '^(TestStressRandomNodesInMesh|TestStressLargePayloadEphemeralMessagesEndurance|TestStress2Nodes500IterationTearDown|TestPeerExchangePXLoad)$' | tee testlogs2.log + + - name: Upload Test Logs (Group 2) + uses: actions/upload-artifact@v4 + with: + name: endurance-logs-group2 + path: testlogs2.log diff --git a/.github/workflows/endurancce_test.yml b/.github/workflows/Repeated_tests_endurancce.yml similarity index 100% rename from .github/workflows/endurancce_test.yml rename to .github/workflows/Repeated_tests_endurancce.yml diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index 7e319d7..3f8465e 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -2,6 +2,7 @@ package waku import ( "context" + "encoding/csv" "encoding/json" "errors" "fmt" @@ -9,6 +10,7 @@ import ( "net/http" "os" "strconv" + "sync" "time" "github.com/cenkalti/backoff/v3" @@ -255,3 +257,38 @@ func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.S Debug("Store query successful, retrieved %d messages", len(*res.Messages)) return res, nil } + +func recordMemoryMetricsPX(testName, phase string, heapAllocKB, rssKB uint64) error { + staticMu := sync.Mutex{} + staticMu.Lock() + defer staticMu.Unlock() + + file, err := os.OpenFile("px_load_metrics.csv", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + stat, err := file.Stat() + if err != nil { + return err + } + if stat.Size() == 0 { + header := []string{"TestName", "Phase", "HeapAlloc(KB)", "RSS(KB)", "Timestamp"} + if err := writer.Write(header); err != nil { + return err + } + } + + row := []string{ + testName, + phase, + strconv.FormatUint(heapAllocKB, 10), + strconv.FormatUint(rssKB, 10), + time.Now().Format(time.RFC3339), + } + return writer.Write(row) +} diff --git a/waku/stress_test.go b/waku/stress_test.go index 4e96bfa..d9739df 100644 --- a/waku/stress_test.go +++ b/waku/stress_test.go @@ -161,18 +161,17 @@ func TestStressHighThroughput10kPublish(t *testing.T) { Debug("Memory usage BEFORE sending => HeapAlloc: %d KB, RSS: %d KB", startHeapKB, startRSSKB) - totalMessages := 1000 + totalMessages := 5000 pubsubTopic := DefaultPubsubTopic - startTime := time.Now() for i := 0; i < totalMessages; i++ { message := node1.CreateMessage() message.Payload = []byte(fmt.Sprintf("High-throughput message #%d", i)) _, err := node1.RelayPublishNoCTX(pubsubTopic, message) require.NoError(t, err, "Failed to publish message %d", i) + time.Sleep(500 * time.Millisecond) } - duration := time.Since(startTime) runtime.ReadMemStats(&memStats) endHeapKB := memStats.HeapAlloc / 1024 @@ -180,9 +179,6 @@ func TestStressHighThroughput10kPublish(t *testing.T) { require.NoError(t, err, "Failed to read final RSS") Debug("Memory usage AFTER sending => HeapAlloc: %d KB, RSS: %d KB", endHeapKB, endRSSKB) - - Debug("Published %d messages in %s", totalMessages, duration) - Debug("Total time per message ~ %v", duration/time.Duration(totalMessages)) } func TestStressConnectDisconnect500Iteration(t *testing.T) { @@ -457,3 +453,83 @@ func TestStress2Nodes500IterationTearDown(t *testing.T) { Debug("[%s] OS-level RSS at test END: %d KB", t.Name(), finalRSS) //require.LessOrEqual(t, finalRSS, initialRSS*3, "OS-level RSS soared above threshold after %d cycles", totalIterations) } + +func TestPeerExchangePXLoad(t *testing.T) { + testName := "PeerExchangePXLoad" + pxServerCfg := DefaultWakuConfig + pxServerCfg.PeerExchange = true + pxServerCfg.Relay = true + pxServer, err := StartWakuNode("PXServer", &pxServerCfg) + require.NoError(t, err, "Failed to start PX server") + defer pxServer.StopAndDestroy() + + relayA, err := StartWakuNode("RelayA", &DefaultWakuConfig) + require.NoError(t, err, "Failed to start RelayA") + defer relayA.StopAndDestroy() + + relayB, err := StartWakuNode("RelayB", &DefaultWakuConfig) + require.NoError(t, err, "Failed to start RelayB") + defer relayB.StopAndDestroy() + + err = pxServer.ConnectPeer(relayA) + require.NoError(t, err, "PXServer failed to connect RelayA") + err = pxServer.ConnectPeer(relayB) + require.NoError(t, err, "PXServer failed to connect RelayB") + + time.Sleep(2 * time.Second) + + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + startHeapKB := memStats.HeapAlloc / 1024 + startRSSKB, err := utils.GetRSSKB() + require.NoError(t, err, "Failed to get initial RSS") + Debug("%s: Before test: HeapAlloc=%d KB, RSS=%d KB", testName, startHeapKB, startRSSKB) + + // Save the initial memory reading to CSV + err = recordMemoryMetricsPX(testName, "start", startHeapKB, startRSSKB) + require.NoError(t, err, "Failed to record start metrics") + + testDuration := 30 * time.Minute + endTime := time.Now().Add(testDuration) + + lastPublishTime := time.Now().Add(-5 * time.Second) // so first publish is immediate + for time.Now().Before(endTime) { + // Publish a message from the PX server every 5 seconds + if time.Since(lastPublishTime) >= 5*time.Second { + msg := pxServer.CreateMessage() + msg.Payload = []byte("PX server message stream") + _, _ = pxServer.RelayPublishNoCTX(DefaultPubsubTopic, msg) + lastPublishTime = time.Now() + } + + // Create a light node that relies on PX, run for 3s + lightCfg := DefaultWakuConfig + lightCfg.Relay = false + lightCfg.Store = false + lightCfg.PeerExchange = true + lightNode, err := StartWakuNode("LightNode", &lightCfg) + if err == nil { + errPX := lightNode.ConnectPeer(pxServer) + if errPX == nil { + // Request peers from PX server + _, _ = lightNode.PeerExchangeRequest(2) + } + time.Sleep(3 * time.Second) + lightNode.StopAndDestroy() + } else { + Debug("Failed to start light node: %v", err) + } + + time.Sleep(1 * time.Second) + } + + runtime.ReadMemStats(&memStats) + endHeapKB := memStats.HeapAlloc / 1024 + endRSSKB, err := utils.GetRSSKB() + require.NoError(t, err, "Failed to get final RSS") + Debug("Memory %s: After test: HeapAlloc=%d KB, RSS=%d KB", testName, endHeapKB, endRSSKB) + + // Save the final memory reading to CSV + err = recordMemoryMetricsPX(testName, "end", endHeapKB, endRSSKB) + require.NoError(t, err, "Failed to record end metrics") +}