diff --git a/.github/workflows/CI_endurance.yml b/.github/workflows/CI_endurance.yml index 314bc19..16a8fe6 100644 --- a/.github/workflows/CI_endurance.yml +++ b/.github/workflows/CI_endurance.yml @@ -36,8 +36,10 @@ jobs: run: sudo sh -c "ulimit -n 8192" - name: Run Endurance Test (Group 1) + shell: bash run: | - go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(TestStressMemoryUsageForThreeNodes|TestStressStoreQuery5kMessagesWithPagination|TestStressHighThroughput10kPublish|TestStressConnectDisconnect500Iteration)$' | tee testlogs1.log + set -euo pipefail + go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(TestStressMemoryUsageForThreeNodes|TestStressStoreQuery5kMessagesWithPagination|TestStressConnectDisconnect500Iteration|TestStressHighThroughput10kPublish)$' | tee testlogs1.log - name: Upload Test Logs (Group 1) uses: actions/upload-artifact@v4 @@ -45,6 +47,12 @@ jobs: name: endurance-logs-group1 path: testlogs1.log + - name: Upload Memory Metrics (Group 1) + uses: actions/upload-artifact@v4 + with: + name: memory-metrics-group1 + path: waku/px_load_metrics.csv + endurance2: runs-on: ubuntu-latest @@ -77,11 +85,20 @@ jobs: run: sudo sh -c "ulimit -n 8192" - name: Run Endurance Test (Group 2) + + shell: bash run: | - go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(TestStressRandomNodesInMesh|TestStressLargePayloadEphemeralMessagesEndurance|TestStress2Nodes500IterationTearDown|TestPeerExchangePXLoad)$' | tee testlogs2.log + set -euo pipefail + go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(TestStressRandomNodesInMesh|TestStress2Nodes2kIterationTearDown|TestPeerExchangePXLoad)$' | tee testlogs2.log - name: Upload Test Logs (Group 2) uses: actions/upload-artifact@v4 with: name: endurance-logs-group2 path: testlogs2.log + + - name: Upload Memory Metrics (Group 2) + uses: actions/upload-artifact@v4 + with: + name: memory-metrics-group2 + path: waku/px_load_metrics.csv \ No newline at end of file diff --git a/.github/workflows/Repeated_tests_endurancce.yml b/.github/workflows/Repeated_tests_endurancce.yml index 0e51b4d..42e1aa2 100644 --- a/.github/workflows/Repeated_tests_endurancce.yml +++ b/.github/workflows/Repeated_tests_endurancce.yml @@ -5,7 +5,8 @@ on: jobs: repeated-tests: - runs-on: ubuntu-latest + runs-on: [self-hosted, ubuntu-22.04] + timeout-minutes: 900 steps: - name: Check out repository uses: actions/checkout@v3 @@ -41,7 +42,7 @@ jobs: - name: Repeated test runs run: | set +e - for i in {1..100}; do + for i in {1..80}; do echo "Iteration $i: measuring memory BEFORE the tests..." go run tools/memory_record.go --iteration $i --phase start echo "Running tests (iteration $i)..." diff --git a/utils/utils.go b/utils/utils.go index c1f03bd..0df2b6b 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -75,3 +75,4 @@ func GetRSSKB() (uint64, error) { pageSize := os.Getpagesize() return (rssPages * uint64(pageSize)) / 1024, nil } + diff --git a/waku/common/store.go b/waku/common/store.go index 066ad4e..fd73c9c 100644 --- a/waku/common/store.go +++ b/waku/common/store.go @@ -1,15 +1,15 @@ package common type StoreQueryRequest struct { - RequestId string `json:"requestId"` - IncludeData bool `json:"includeData"` + RequestId string `json:"request_id"` + IncludeData bool `json:"include_data"` PubsubTopic string `json:"pubsubTopic,omitempty"` ContentTopics *[]string `json:"contentTopics,omitempty"` TimeStart *int64 `json:"timeStart,omitempty"` TimeEnd *int64 `json:"timeEnd,omitempty"` MessageHashes *[]MessageHash `json:"messageHashes,omitempty"` PaginationCursor *MessageHash `json:"paginationCursor,omitempty"` - PaginationForward bool `json:"paginationForward"` + PaginationForward bool `json:"pagination_forward"` PaginationLimit *uint64 `json:"paginationLimit,omitempty"` } diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index a2d64a0..4da8272 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "os" + "runtime" "strconv" "sync" "time" @@ -16,6 +17,7 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/waku-go-bindings/utils" "github.com/waku-org/waku-go-bindings/waku/common" "google.golang.org/protobuf/proto" ) @@ -288,3 +290,15 @@ func recordMemoryMetricsPX(testName, phase string, heapAllocKB, rssKB uint64) er } return writer.Write(row) } + +func captureMemory(testName, phase string) { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + + heapKB := ms.HeapAlloc / 1024 + rssKB, _ := utils.GetRSSKB() + + Debug("[%s] Memory usage (%s): %d KB (RSS %d KB)", testName, phase, heapKB, rssKB) + + _ = recordMemoryMetricsPX(testName, phase, heapKB, rssKB) +} \ No newline at end of file diff --git a/waku/stress_test.go b/waku/stress_test.go index b06a059..45137ae 100644 --- a/waku/stress_test.go +++ b/waku/stress_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/waku-org/waku-go-bindings/utils" "github.com/waku-org/waku-go-bindings/waku/common" // "go.uber.org/zap/zapcore" @@ -21,9 +20,7 @@ import ( func TestStressMemoryUsageForThreeNodes(t *testing.T) { testName := t.Name() var err error - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - Debug("[%s] Memory usage BEFORE creating nodes: %d KB", testName, memStats.HeapAlloc/1024) + captureMemory(t.Name(), "start") node1Cfg := DefaultWakuConfig node1Cfg.TcpPort, node1Cfg.Discv5UdpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) @@ -41,8 +38,7 @@ func TestStressMemoryUsageForThreeNodes(t *testing.T) { node3, err := NewWakuNode(&node3Cfg, "node3") require.NoError(t, err) - runtime.ReadMemStats(&memStats) - Debug("[%s] Memory usage AFTER creating nodes: %d KB", testName, memStats.HeapAlloc/1024) + captureMemory(t.Name(), "before nodes start") err = node1.Start() require.NoError(t, err) @@ -51,8 +47,7 @@ func TestStressMemoryUsageForThreeNodes(t *testing.T) { err = node3.Start() require.NoError(t, err) - runtime.ReadMemStats(&memStats) - Debug("[%s] Memory usage AFTER starting nodes: %d KB", testName, memStats.HeapAlloc/1024) + captureMemory(t.Name(), "after nodes run") time.Sleep(2 * time.Second) @@ -64,8 +59,7 @@ func TestStressMemoryUsageForThreeNodes(t *testing.T) { time.Sleep(1 * time.Second) runtime.GC() - runtime.ReadMemStats(&memStats) - Debug("[%s] Memory usage AFTER destroying nodes: %d KB", testName, memStats.HeapAlloc/1024) + captureMemory(t.Name(), "at end") Debug("[%s] Test completed successfully", testName) } @@ -91,12 +85,10 @@ func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) { node2.StopAndDestroy() }() - var memStats runtime.MemStats - iterations := 50 + + iterations := 4000 - runtime.ReadMemStats(&memStats) - initialHeapAlloc := memStats.HeapAlloc - Debug("Initial memory usage check before publishing %d MB", initialHeapAlloc/1024/1024) + captureMemory(t.Name(), "at start") queryTimestamp := proto.Int64(time.Now().UnixNano()) @@ -107,14 +99,12 @@ func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) { require.NoError(t, err, "Failed to publish message") if i%10 == 0 { - runtime.ReadMemStats(&memStats) - Debug("Memory usage at iteration %d: HeapAlloc=%v MB, NumGC=%v", - i, memStats.HeapAlloc/1024/1024, memStats.NumGC) storeQueryRequest := &common.StoreQueryRequest{ TimeStart: queryTimestamp, IncludeData: true, PaginationLimit: proto.Uint64(50), + PaginationForward: false, } storedmsgs, err := wakuNode.GetStoredMessages(node2, storeQueryRequest) @@ -123,11 +113,7 @@ func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) { } } - runtime.ReadMemStats(&memStats) - finalHeapAlloc := memStats.HeapAlloc - Debug("Memory before test: %v KB, Memory after test: %v KB", initialHeapAlloc/1024, finalHeapAlloc/1024) - - //require.LessOrEqual(t, finalHeapAlloc, initialHeapAlloc*2, "Memory usage has grown too much") + captureMemory(t.Name(), "at end") Debug("[%s] Test completed successfully", t.Name()) } @@ -149,15 +135,9 @@ func TestStressHighThroughput10kPublish(t *testing.T) { err = node1.ConnectPeer(node2) require.NoError(t, err, "Failed to connect node1 to node2") - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - startHeapKB := memStats.HeapAlloc / 1024 - startRSSKB, err := utils.GetRSSKB() - require.NoError(t, err, "Failed to read initial RSS") + captureMemory(t.Name(), "at start") - Debug("Memory usage BEFORE sending => HeapAlloc: %d KB, RSS: %d KB", startHeapKB, startRSSKB) - - totalMessages := 5000 + totalMessages := 2000 pubsubTopic := DefaultPubsubTopic for i := 0; i < totalMessages; i++ { @@ -170,21 +150,11 @@ func TestStressHighThroughput10kPublish(t *testing.T) { Debug("###Iteration number#%d", i) } - runtime.ReadMemStats(&memStats) - endHeapKB := memStats.HeapAlloc / 1024 - endRSSKB, err := utils.GetRSSKB() - require.NoError(t, err, "Failed to read final RSS") - - Debug("Memory usage AFTER sending => HeapAlloc: %d KB, RSS: %d KB", endHeapKB, endRSSKB) + captureMemory(t.Name(), "at end") } -func TestStressConnectDisconnect500Iteration(t *testing.T) { - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - startHeapKB := memStats.HeapAlloc / 1024 - startRSSKB, err := utils.GetRSSKB() - require.NoError(t, err) - Debug("Before test: HeapAlloc = %d KB, RSS = %d KB", startHeapKB, startRSSKB) +func TestStressConnectDisconnect1kIteration(t *testing.T) { + captureMemory(t.Name(), "at start") node0Cfg := DefaultWakuConfig node0Cfg.Relay = true @@ -199,7 +169,7 @@ func TestStressConnectDisconnect500Iteration(t *testing.T) { node1.StopAndDestroy() }() - iterations := 200 + iterations := 2000 for i := 1; i <= iterations; i++ { err := node0.ConnectPeer(node1) require.NoError(t, err, "Iteration %d: node0 failed to connect to node1", i) @@ -219,18 +189,14 @@ func TestStressConnectDisconnect500Iteration(t *testing.T) { Debug("Iteration %d: node0 disconnected from node1", i) time.Sleep(2 * time.Second) } - runtime.ReadMemStats(&memStats) - endHeapKB := memStats.HeapAlloc / 1024 - endRSSKB, err := utils.GetRSSKB() - require.NoError(t, err) - Debug("After test: HeapAlloc = %d KB, RSS = %d KB", endHeapKB, endRSSKB) + captureMemory(t.Name(), "at end") } func TestStressRandomNodesInMesh(t *testing.T) { r := rand.New(rand.NewSource(time.Now().UnixNano())) minNodes := 5 - maxNodes := 10 + maxNodes := 20 nodes := make([]*WakuNode, 0, maxNodes) for i := 0; i < minNodes; i++ { @@ -245,12 +211,7 @@ func TestStressRandomNodesInMesh(t *testing.T) { time.Sleep(1 * time.Second) require.NoError(t, err, "Failed to connect initial nodes with ConnectAllPeers") - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - startHeapKB := memStats.HeapAlloc / 1024 - startRSSKB, err2 := utils.GetRSSKB() - require.NoError(t, err2, "Failed to read initial RSS") - Debug("Memory at start of test: HeapAlloc=%d KB, RSS=%d KB", startHeapKB, startRSSKB) + captureMemory(t.Name(), "at start") testDuration := 30 * time.Minute endTime := time.Now().Add(testDuration) @@ -308,11 +269,7 @@ func TestStressRandomNodesInMesh(t *testing.T) { n.StopAndDestroy() } - runtime.ReadMemStats(&memStats) - endHeapKB := memStats.HeapAlloc / 1024 - endRSSKB, err3 := utils.GetRSSKB() - require.NoError(t, err3, "Failed to read final RSS") - Debug("Memory at end of test: HeapAlloc=%d KB, RSS=%d KB", endHeapKB, endRSSKB) + captureMemory(t.Name(), "at end") } func TestStressLargePayloadEphemeralMessagesEndurance(t *testing.T) { @@ -320,30 +277,29 @@ func TestStressLargePayloadEphemeralMessagesEndurance(t *testing.T) { nodePubCfg.Relay = true publisher, err := StartWakuNode("publisher", &nodePubCfg) require.NoError(t, err) - defer publisher.StopAndDestroy() nodeRecvCfg := DefaultWakuConfig nodeRecvCfg.Relay = true receiver, err := StartWakuNode("receiver", &nodeRecvCfg) require.NoError(t, err) - defer receiver.StopAndDestroy() err = receiver.RelaySubscribe(DefaultPubsubTopic) require.NoError(t, err) + defer func() { + publisher.StopAndDestroy() + time.Sleep(30 * time.Second) + receiver.StopAndDestroy() + + }() err = publisher.ConnectPeer(receiver) require.NoError(t, err) time.Sleep(2 * time.Second) - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - startHeapKB := memStats.HeapAlloc / 1024 - startRSSKB, err := utils.GetRSSKB() - require.NoError(t, err) - Debug("Before endurance test: HeapAlloc = %d KB, RSS = %d KB", startHeapKB, startRSSKB) + captureMemory(t.Name(), "at start") - maxIterations := 2000 + maxIterations := 5000 payloadSize := 100 * 1024 largePayload := make([]byte, payloadSize) for i := range largePayload { @@ -368,25 +324,15 @@ func TestStressLargePayloadEphemeralMessagesEndurance(t *testing.T) { Debug("###Iteration number %d", i+1) } - runtime.ReadMemStats(&memStats) - endHeapKB := memStats.HeapAlloc / 1024 - endRSSKB, err := utils.GetRSSKB() - require.NoError(t, err) - Debug("After endurance test: HeapAlloc = %d KB, RSS = %d KB", endHeapKB, endRSSKB) + captureMemory(t.Name(), "at end") + } -func TestStress2Nodes500IterationTearDown(t *testing.T) { +func TestStress2Nodes2kIterationTearDown(t *testing.T) { - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - initialMem := memStats.HeapAlloc - Debug("[%s] Memory usage at test START: %d KB", t.Name(), initialMem/1024) - - initialRSS, err := utils.GetRSSKB() - require.NoError(t, err) - Debug("[%s] OS-level RSS at test START: %d KB", t.Name(), initialRSS) - - totalIterations := 500 + captureMemory(t.Name(), "at start") + var err error + totalIterations := 2000 for i := 1; i <= totalIterations; i++ { var nodes []*WakuNode for n := 1; n <= 2; n++ { @@ -416,32 +362,16 @@ func TestStress2Nodes500IterationTearDown(t *testing.T) { runtime.GC() time.Sleep(250 * time.Millisecond) runtime.GC() - if i == 250 || i == 500 { - runtime.ReadMemStats(&memStats) - Debug("Iteration %d, usage after teardown: %d KB", i, memStats.HeapAlloc/1024) - require.LessOrEqual(t, memStats.HeapAlloc, initialMem*3, "Memory usage soared above threshold after iteration %d", i) - rssNow, err := utils.GetRSSKB() - require.NoError(t, err) - Debug("Iteration %d, OS-level RSS after teardown: %d KB", i, rssNow) - //require.LessOrEqual(t, rssNow, initialRSS*10, "OS-level RSS soared above threshold after iteration %d", i) - } Debug("Iteration numberrrrrr %d", i) } runtime.GC() time.Sleep(500 * time.Millisecond) runtime.GC() - runtime.ReadMemStats(&memStats) - finalMem := memStats.HeapAlloc - Debug("[%s] Memory usage at test END: %d KB", t.Name(), finalMem/1024) - // require.LessOrEqual(t, finalMem, initialMem*3, "Memory usage soared above threshold after %d cycles", totalIterations) - finalRSS, err := utils.GetRSSKB() - require.NoError(t, err) - Debug("[%s] OS-level RSS at test END: %d KB", t.Name(), finalRSS) + captureMemory(t.Name(), "at end") //require.LessOrEqual(t, finalRSS, initialRSS*3, "OS-level RSS soared above threshold after %d cycles", totalIterations) } func TestPeerExchangePXLoad(t *testing.T) { - testName := "PeerExchangePXLoad" pxServerCfg := DefaultWakuConfig pxServerCfg.PeerExchange = true pxServerCfg.Relay = true @@ -464,16 +394,7 @@ func TestPeerExchangePXLoad(t *testing.T) { time.Sleep(2 * time.Second) - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - startHeapKB := memStats.HeapAlloc / 1024 - startRSSKB, err := utils.GetRSSKB() - require.NoError(t, err, "Failed to get initial RSS") - Debug("%s: Before test: HeapAlloc=%d KB, RSS=%d KB", testName, startHeapKB, startRSSKB) - - // Save the initial memory reading to CSV - err = recordMemoryMetricsPX(testName, "start", startHeapKB, startRSSKB) - require.NoError(t, err, "Failed to record start metrics") + captureMemory(t.Name(), "at start") testDuration := 30 * time.Minute endTime := time.Now().Add(testDuration) @@ -509,13 +430,5 @@ func TestPeerExchangePXLoad(t *testing.T) { time.Sleep(1 * time.Second) } - runtime.ReadMemStats(&memStats) - endHeapKB := memStats.HeapAlloc / 1024 - endRSSKB, err := utils.GetRSSKB() - require.NoError(t, err, "Failed to get final RSS") - Debug("Memory %s: After test: HeapAlloc=%d KB, RSS=%d KB", testName, endHeapKB, endRSSKB) - - // Save the final memory reading to CSV - err = recordMemoryMetricsPX(testName, "end", endHeapKB, endRSSKB) - require.NoError(t, err, "Failed to record end metrics") + captureMemory(t.Name(), "at end") }