split CI jobs to speed it up

This commit is contained in:
aya 2025-03-26 12:41:36 +02:00
parent 6cfa13a21c
commit 18e887d94c
5 changed files with 208 additions and 7 deletions

View File

@ -38,7 +38,7 @@ jobs:
- name: Run Endurance Test
run: |
go test -p=1 -v ./waku -count=1 -timeout=360m -run '^TestStress' | tee testlogs.log
go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(?!TestStress).*' | tee testlogs.log
- name: Upload Test Logs
uses: actions/upload-artifact@v4

88
.github/workflows/CI_endurance.yml vendored Normal file
View File

@ -0,0 +1,88 @@
name: Endurance Tests
on:
push:
branches: [ "stress_test" ]
jobs:
endurance1:
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v3
with:
submodules: true
- name: Initialize & update submodules
run: git submodule update --init --recursive
- name: Prepare third_party directory
run: |
sudo mkdir -p third_party
sudo chown $USER third_party
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.21"
- name: Install Go dependencies
run: go mod download
- name: Build nwaku dependencies
run: make -C waku
- name: Increase ulimit
run: sudo sh -c "ulimit -n 8192"
- name: Run Endurance Test (Group 1)
run: |
go test -p=1 ./waku -count=1 -timeout=360m -run '^(TestStressMemoryUsageForThreeNodes|TestStressStoreQuery5kMessagesWithPagination|TestStressHighThroughput10kPublish|TestStressConnectDisconnect500Iteration)$' | tee testlogs1.log
- name: Upload Test Logs (Group 1)
uses: actions/upload-artifact@v4
with:
name: endurance-logs-group1
path: testlogs1.log
endurance2:
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v3
with:
submodules: true
- name: Initialize & update submodules
run: git submodule update --init --recursive
- name: Prepare third_party directory
run: |
sudo mkdir -p third_party
sudo chown $USER third_party
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.21"
- name: Install Go dependencies
run: go mod download
- name: Build nwaku dependencies
run: make -C waku
- name: Increase ulimit
run: sudo sh -c "ulimit -n 8192"
- name: Run Endurance Test (Group 2)
run: |
go test -p=1 ./waku -count=1 -timeout=360m -run '^(TestStressRandomNodesInMesh|TestStressLargePayloadEphemeralMessagesEndurance|TestStress2Nodes500IterationTearDown|TestPeerExchangePXLoad)$' | tee testlogs2.log
- name: Upload Test Logs (Group 2)
uses: actions/upload-artifact@v4
with:
name: endurance-logs-group2
path: testlogs2.log

View File

@ -2,6 +2,7 @@ package waku
import (
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
@ -9,6 +10,7 @@ import (
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/cenkalti/backoff/v3"
@ -255,3 +257,38 @@ func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.S
Debug("Store query successful, retrieved %d messages", len(*res.Messages))
return res, nil
}
func recordMemoryMetricsPX(testName, phase string, heapAllocKB, rssKB uint64) error {
staticMu := sync.Mutex{}
staticMu.Lock()
defer staticMu.Unlock()
file, err := os.OpenFile("px_load_metrics.csv", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
stat, err := file.Stat()
if err != nil {
return err
}
if stat.Size() == 0 {
header := []string{"TestName", "Phase", "HeapAlloc(KB)", "RSS(KB)", "Timestamp"}
if err := writer.Write(header); err != nil {
return err
}
}
row := []string{
testName,
phase,
strconv.FormatUint(heapAllocKB, 10),
strconv.FormatUint(rssKB, 10),
time.Now().Format(time.RFC3339),
}
return writer.Write(row)
}

View File

@ -161,18 +161,17 @@ func TestStressHighThroughput10kPublish(t *testing.T) {
Debug("Memory usage BEFORE sending => HeapAlloc: %d KB, RSS: %d KB", startHeapKB, startRSSKB)
totalMessages := 1000
totalMessages := 5000
pubsubTopic := DefaultPubsubTopic
startTime := time.Now()
for i := 0; i < totalMessages; i++ {
message := node1.CreateMessage()
message.Payload = []byte(fmt.Sprintf("High-throughput message #%d", i))
_, err := node1.RelayPublishNoCTX(pubsubTopic, message)
require.NoError(t, err, "Failed to publish message %d", i)
time.Sleep(500 * time.Millisecond)
}
duration := time.Since(startTime)
runtime.ReadMemStats(&memStats)
endHeapKB := memStats.HeapAlloc / 1024
@ -180,9 +179,6 @@ func TestStressHighThroughput10kPublish(t *testing.T) {
require.NoError(t, err, "Failed to read final RSS")
Debug("Memory usage AFTER sending => HeapAlloc: %d KB, RSS: %d KB", endHeapKB, endRSSKB)
Debug("Published %d messages in %s", totalMessages, duration)
Debug("Total time per message ~ %v", duration/time.Duration(totalMessages))
}
func TestStressConnectDisconnect500Iteration(t *testing.T) {
@ -457,3 +453,83 @@ func TestStress2Nodes500IterationTearDown(t *testing.T) {
Debug("[%s] OS-level RSS at test END: %d KB", t.Name(), finalRSS)
//require.LessOrEqual(t, finalRSS, initialRSS*3, "OS-level RSS soared above threshold after %d cycles", totalIterations)
}
func TestPeerExchangePXLoad(t *testing.T) {
testName := "PeerExchangePXLoad"
pxServerCfg := DefaultWakuConfig
pxServerCfg.PeerExchange = true
pxServerCfg.Relay = true
pxServer, err := StartWakuNode("PXServer", &pxServerCfg)
require.NoError(t, err, "Failed to start PX server")
defer pxServer.StopAndDestroy()
relayA, err := StartWakuNode("RelayA", &DefaultWakuConfig)
require.NoError(t, err, "Failed to start RelayA")
defer relayA.StopAndDestroy()
relayB, err := StartWakuNode("RelayB", &DefaultWakuConfig)
require.NoError(t, err, "Failed to start RelayB")
defer relayB.StopAndDestroy()
err = pxServer.ConnectPeer(relayA)
require.NoError(t, err, "PXServer failed to connect RelayA")
err = pxServer.ConnectPeer(relayB)
require.NoError(t, err, "PXServer failed to connect RelayB")
time.Sleep(2 * time.Second)
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
startHeapKB := memStats.HeapAlloc / 1024
startRSSKB, err := utils.GetRSSKB()
require.NoError(t, err, "Failed to get initial RSS")
Debug("%s: Before test: HeapAlloc=%d KB, RSS=%d KB", testName, startHeapKB, startRSSKB)
// Save the initial memory reading to CSV
err = recordMemoryMetricsPX(testName, "start", startHeapKB, startRSSKB)
require.NoError(t, err, "Failed to record start metrics")
testDuration := 30 * time.Minute
endTime := time.Now().Add(testDuration)
lastPublishTime := time.Now().Add(-5 * time.Second) // so first publish is immediate
for time.Now().Before(endTime) {
// Publish a message from the PX server every 5 seconds
if time.Since(lastPublishTime) >= 5*time.Second {
msg := pxServer.CreateMessage()
msg.Payload = []byte("PX server message stream")
_, _ = pxServer.RelayPublishNoCTX(DefaultPubsubTopic, msg)
lastPublishTime = time.Now()
}
// Create a light node that relies on PX, run for 3s
lightCfg := DefaultWakuConfig
lightCfg.Relay = false
lightCfg.Store = false
lightCfg.PeerExchange = true
lightNode, err := StartWakuNode("LightNode", &lightCfg)
if err == nil {
errPX := lightNode.ConnectPeer(pxServer)
if errPX == nil {
// Request peers from PX server
_, _ = lightNode.PeerExchangeRequest(2)
}
time.Sleep(3 * time.Second)
lightNode.StopAndDestroy()
} else {
Debug("Failed to start light node: %v", err)
}
time.Sleep(1 * time.Second)
}
runtime.ReadMemStats(&memStats)
endHeapKB := memStats.HeapAlloc / 1024
endRSSKB, err := utils.GetRSSKB()
require.NoError(t, err, "Failed to get final RSS")
Debug("Memory %s: After test: HeapAlloc=%d KB, RSS=%d KB", testName, endHeapKB, endRSSKB)
// Save the final memory reading to CSV
err = recordMemoryMetricsPX(testName, "end", endHeapKB, endRSSKB)
require.NoError(t, err, "Failed to record end metrics")
}