Merge pull request #47 from waku-org/stress_test

Endurance tests
This commit is contained in:
AYAHASSAN287 2025-04-04 11:06:47 +02:00 committed by GitHub
commit 8da16a90ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 911 additions and 22 deletions

48
.github/workflows/CI.yml vendored Normal file
View File

@ -0,0 +1,48 @@
name: Bindings Tests
on:
workflow_dispatch:
schedule:
- cron: '0 2 * * *'
jobs:
endurance:
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v3
with:
submodules: true
- name: Initialize & update submodules
run: git submodule update --init --recursive
- name: Prepare third_party directory
run: |
sudo mkdir -p third_party
sudo chown $USER third_party
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.21"
- name: Install Go dependencies
run: go mod download
- name: Build nwaku dependencies
run: make -C waku
- name: Increase ulimit
run: sudo sh -c "ulimit -n 8192"
- name: Run Endurance Test
run: |
go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(?!TestStress).*' | tee testlogs.log
- name: Upload Test Logs
uses: actions/upload-artifact@v4
with:
name: endurance-logs
path: testlogs.log

87
.github/workflows/CI_endurance.yml vendored Normal file
View File

@ -0,0 +1,87 @@
name: Endurance Tests
on:
workflow_dispatch:
jobs:
endurance1:
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v3
with:
submodules: true
- name: Initialize & update submodules
run: git submodule update --init --recursive
- name: Prepare third_party directory
run: |
sudo mkdir -p third_party
sudo chown $USER third_party
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.21"
- name: Install Go dependencies
run: go mod download
- name: Build nwaku dependencies
run: make -C waku
- name: Increase ulimit
run: sudo sh -c "ulimit -n 8192"
- name: Run Endurance Test (Group 1)
run: |
go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(TestStressMemoryUsageForThreeNodes|TestStressStoreQuery5kMessagesWithPagination|TestStressHighThroughput10kPublish|TestStressConnectDisconnect500Iteration)$' | tee testlogs1.log
- name: Upload Test Logs (Group 1)
uses: actions/upload-artifact@v4
with:
name: endurance-logs-group1
path: testlogs1.log
endurance2:
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v3
with:
submodules: true
- name: Initialize & update submodules
run: git submodule update --init --recursive
- name: Prepare third_party directory
run: |
sudo mkdir -p third_party
sudo chown $USER third_party
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.21"
- name: Install Go dependencies
run: go mod download
- name: Build nwaku dependencies
run: make -C waku
- name: Increase ulimit
run: sudo sh -c "ulimit -n 8192"
- name: Run Endurance Test (Group 2)
run: |
go test -p=1 -v ./waku -count=1 -timeout=360m -run '^(TestStressRandomNodesInMesh|TestStressLargePayloadEphemeralMessagesEndurance|TestStress2Nodes500IterationTearDown|TestPeerExchangePXLoad)$' | tee testlogs2.log
- name: Upload Test Logs (Group 2)
uses: actions/upload-artifact@v4
with:
name: endurance-logs-group2
path: testlogs2.log

View File

@ -0,0 +1,58 @@
name: Repeated Test Suite
on:
workflow_dispatch:
jobs:
repeated-tests:
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Initialize & update submodules
run: git submodule update --init --recursive
- name: Prepare third_party directory
run: |
sudo mkdir -p third_party
sudo chown $USER third_party
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.21"
- name: Install Go dependencies
run: go mod download
- name: Build nwaku dependencies
run: make -C waku
- name: Clean environment
run: go clean -cache
- name: Initialize CSV
run: echo "TestName,Iteration,Phase,HeapAlloc(KB),RSS(KB),Timestamp" > memory_metrics.csv
- name: Repeated test runs
run: |
set +e
for i in {1..100}; do
echo "Iteration $i: measuring memory BEFORE the tests..."
go run tools/memory_record.go --iteration $i --phase start
echo "Running tests (iteration $i)..."
go test -v -tags '!stress' ./...
echo "Iteration $i: measuring memory AFTER the tests..."
go run tools/memory_record.go --iteration $i --phase end
done
set -e
- name: Upload memory_metrics.csv
uses: actions/upload-artifact@v4
with:
name: memory_metrics
path: memory_metrics.csv

2
.gitignore vendored
View File

@ -29,3 +29,5 @@ waku/store.sqlite3
waku/store.sqlite3-shm
waku/store.sqlite3-wal
waku/test_repeated_start_stop.log

38
tools/memory_record.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"flag"
"fmt"
"os"
"runtime"
"github.com/waku-org/waku-go-bindings/utils"
)
var (
testName string
iteration int
phase string
)
func main() {
flag.StringVar(&testName, "testName", "FullTestSuite", "Name of the test ")
flag.IntVar(&iteration, "iteration", 0, "Iteration number")
flag.StringVar(&phase, "phase", "", "'start' or 'end')")
flag.Parse()
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
heapKB := memStats.HeapAlloc / 1024
rssKB, err := utils.GetRSSKB()
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to get RSS:", err)
rssKB = 0
}
if err := utils.RecordMemoryMetricsCSV(testName, iteration, phase, heapKB, rssKB); err != nil {
fmt.Fprintln(os.Stderr, "Error recording metrics:", err)
os.Exit(1)
}
}

77
utils/utils.go Normal file
View File

@ -0,0 +1,77 @@
package utils
import (
"encoding/csv"
"fmt"
"io"
"os"
"strconv"
"strings"
"sync"
"time"
)
var (
testName string
iteration int
phase string
mu sync.Mutex
)
func RecordMemoryMetricsCSV(testName string, iter int, phase string, heapKB, rssKB uint64) error {
mu.Lock()
defer mu.Unlock()
f, err := os.OpenFile("memory_metrics.csv", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
stat, err := f.Stat()
if err != nil {
return err
}
if stat.Size() == 0 {
header := []string{"TestName", "Iteration", "Phase", "HeapAlloc(KB)", "RSS(KB)", "Timestamp"}
if err := w.Write(header); err != nil {
return err
}
}
row := []string{
testName,
strconv.Itoa(iter),
phase,
strconv.FormatUint(heapKB, 10),
strconv.FormatUint(rssKB, 10),
time.Now().Format(time.RFC3339),
}
return w.Write(row)
}
func GetRSSKB() (uint64, error) {
f, err := os.Open("/proc/self/statm")
if err != nil {
return 0, err
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
return 0, err
}
fields := strings.Fields(string(data))
if len(fields) < 2 {
return 0, fmt.Errorf("unexpected /proc/self/statm format")
}
rssPages, err := strconv.ParseUint(fields[1], 10, 64)
if err != nil {
return 0, err
}
pageSize := os.Getpagesize()
return (rssPages * uint64(pageSize)) / 1024, nil
}

View File

@ -13,9 +13,8 @@ var (
func _getLogger() *zap.SugaredLogger {
once.Do(func() {
config := zap.NewDevelopmentConfig()
config.DisableCaller = true
config.EncoderConfig.CallerKey = ""
l, err := config.Build()
if err != nil {
panic(err)

View File

@ -28,7 +28,9 @@ func TestBasicWakuNodes(t *testing.T) {
Debug("TestBasicWakuNodes completed successfully")
}
/* artifact https://github.com/waku-org/waku-go-bindings/issues/40 */
func TestNodeRestart(t *testing.T) {
t.Skip("Skipping test for open artifact ")
Debug("Starting TestNodeRestart")
Debug("Creating Node")

View File

@ -31,6 +31,7 @@ import (
// --nat=extip:${IP_ADDRESS} --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646
func TestBasicWaku(t *testing.T) {
t.Skip("Skipping test as choosing this port will fail the CI")
extNodeRestPort := 8646
storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort)
require.NoError(t, err)

View File

@ -2,6 +2,7 @@ package waku
import (
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
@ -9,6 +10,7 @@ import (
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/cenkalti/backoff/v3"
@ -255,3 +257,38 @@ func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.S
Debug("Store query successful, retrieved %d messages", len(*res.Messages))
return res, nil
}
func recordMemoryMetricsPX(testName, phase string, heapAllocKB, rssKB uint64) error {
staticMu := sync.Mutex{}
staticMu.Lock()
defer staticMu.Unlock()
file, err := os.OpenFile("px_load_metrics.csv", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
stat, err := file.Stat()
if err != nil {
return err
}
if stat.Size() == 0 {
header := []string{"TestName", "Phase", "HeapAlloc(KB)", "RSS(KB)", "Timestamp"}
if err := writer.Write(header); err != nil {
return err
}
}
row := []string{
testName,
phase,
strconv.FormatUint(heapAllocKB, 10),
strconv.FormatUint(rssKB, 10),
time.Now().Format(time.RFC3339),
}
return writer.Write(row)
}

View File

@ -186,6 +186,7 @@ func TestDiscv5PeerMeshCount(t *testing.T) {
Debug("Waiting for nodes to auto-connect via Discv5")
err = WaitForAutoConnection([]*WakuNode{node1, node2, node3})
require.NoError(t, err, "Nodes did not auto-connect within timeout")
time.Sleep(time.Second * 5)
Debug("Fetching number of peers in mesh for Node1 before stopping Node3")
peerCountBefore, err := node1.GetNumPeersInMesh(defaultPubsubTopic)

View File

@ -391,14 +391,15 @@ func TestRelaySubscribeAndPeerCountChange(t *testing.T) {
node2, err := StartWakuNode("Node2", &node2Config)
require.NoError(t, err, "Failed to start Node2")
node2Address, err := node2.ListenAddresses()
require.NoError(t, err, "Failed to get listening address for Node2")
// Commented till we configure external IPs
//node2Address, err := node2.ListenAddresses()
//require.NoError(t, err, "Failed to get listening address for Node2")
node3Config := DefaultWakuConfig
node3Config.Relay = true
node3Config.Staticnodes = []string{node2Address[0].String()}
node3Config.Staticnodes = []string{node1Address[0].String()}
Debug("Creating Node3 with Node2 as a static node")
Debug("Creating Node3 with Node1 as a static node")
node3, err := StartWakuNode("Node3", &node3Config)
require.NoError(t, err, "Failed to start Node3")
@ -417,7 +418,7 @@ func TestRelaySubscribeAndPeerCountChange(t *testing.T) {
Debug("Waiting for peer connections to stabilize")
options := func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 10 * time.Second // Only set the max wait time
b.MaxElapsedTime = 10 * time.Second
}
require.NoError(t, RetryWithBackOff(func() error {
numPeers, err := node1.GetNumConnectedRelayPeers(defaultPubsubTopic)

View File

@ -14,7 +14,7 @@ import (
func TestStoreQuery3Nodes(t *testing.T) {
Debug("Starting test to verify store query from a peer using direct peer connections")
queryTimestamp := proto.Int64(time.Now().UnixNano())
node1Config := DefaultWakuConfig
node1Config.Relay = true
@ -53,8 +53,8 @@ func TestStoreQuery3Nodes(t *testing.T) {
require.NoError(t, err, "Failed to connect Node3 to Node2")
Debug("Waiting for peer connections to stabilize")
time.Sleep(2 * time.Second)
err = WaitForAutoConnection([]*WakuNode{node1, node2, node3})
require.NoError(t, err, "Nodes did not connect within timeout")
Debug("Publishing message from Node1 using RelayPublish")
message := node1.CreateMessage(&pb.WakuMessage{
Payload: []byte("test-message"),
@ -62,7 +62,6 @@ func TestStoreQuery3Nodes(t *testing.T) {
Timestamp: proto.Int64(time.Now().UnixNano()),
})
queryTimestamp := proto.Int64(time.Now().UnixNano())
msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message)
require.NoError(t, err, "Failed to publish message from Node1")
@ -75,14 +74,15 @@ func TestStoreQuery3Nodes(t *testing.T) {
Debug("Node3 querying stored messages from Node2")
storeQueryRequest := &common.StoreQueryRequest{
TimeStart: queryTimestamp,
TimeStart: queryTimestamp,
IncludeData: true,
}
res, err := node3.GetStoredMessages(node2, storeQueryRequest)
var storedMessages = *res.Messages
var storedMessages = (*res.Messages)[0]
require.NoError(t, err, "Failed to retrieve stored messages from Node2")
require.NotEmpty(t, storedMessages, "Expected at least one stored message")
require.NotEmpty(t, storedMessages.WakuMessage, "Expected at least one stored message")
Debug("Verifying stored message matches the published message")
require.Equal(t, message.Payload, storedMessages[0].WakuMessage.Payload, "Stored message payload does not match")
require.Equal(t, string(message.Payload), string(storedMessages.WakuMessage.Payload), "Stored message payload does not match")
Debug("Test successfully verified store query from a peer using direct peer connections")
}
@ -117,7 +117,7 @@ func TestStoreQueryMultipleMessages(t *testing.T) {
node2.StopAndDestroy()
node3.StopAndDestroy()
}()
var timestamp = proto.Int64(time.Now().UnixNano())
Debug("Connecting Node1 to Node2")
err = node1.ConnectPeer(node2)
require.NoError(t, err, "Failed to connect Node1 to Node2")
@ -151,7 +151,14 @@ func TestStoreQueryMultipleMessages(t *testing.T) {
time.Sleep(1 * time.Second)
Debug("Node3 querying stored messages from Node2")
res, err := node3.GetStoredMessages(node2, nil)
storeRequest := &common.StoreQueryRequest{
IncludeData: true,
ContentTopics: &[]string{"test-content-topic"},
PaginationLimit: proto.Uint64(uint64(50)),
PaginationForward: true,
TimeStart: timestamp,
}
res, err := node3.GetStoredMessages(node2, storeRequest)
require.NoError(t, err, "Failed to retrieve stored messages from Node2")
require.NotNil(t, res.Messages, "Expected stored messages but received nil")
@ -282,7 +289,7 @@ func TestStoreQueryWithPaginationMultiplePages(t *testing.T) {
node2.StopAndDestroy()
node3.StopAndDestroy()
}()
var timestamp = proto.Int64(time.Now().UnixNano())
Debug("Connecting Node1 to Node2")
err = node1.ConnectPeer(node2)
require.NoError(t, err, "Failed to connect Node1 to Node2")
@ -321,6 +328,7 @@ func TestStoreQueryWithPaginationMultiplePages(t *testing.T) {
ContentTopics: &[]string{"test-content-topic"},
PaginationLimit: proto.Uint64(5),
PaginationForward: true,
TimeStart: timestamp,
}
res1, err := node3.GetStoredMessages(node2, &storeRequest1)
@ -404,6 +412,7 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) {
var sentHashes []common.MessageHash
defaultPubsubTopic := DefaultPubsubTopic
queryTimestamp := proto.Int64(time.Now().UnixNano())
Debug("Publishing %d messages from Node1 using RelayPublish", numMessages)
for i := 0; i < numMessages; i++ {
message := node1.CreateMessage(&pb.WakuMessage{
@ -416,6 +425,7 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) {
require.NoError(t, err, "Failed to publish message from Node1")
sentHashes = append(sentHashes, msgHash)
Debug("sent hash number %i is %s", i, sentHashes[i])
}
Debug("Waiting for message delivery to Node2")
@ -427,6 +437,7 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) {
ContentTopics: &[]string{"test-content-topic"},
PaginationLimit: proto.Uint64(5),
PaginationForward: false,
TimeStart: queryTimestamp,
}
res1, err := node3.GetStoredMessages(node2, &storeRequest1)
@ -435,18 +446,22 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) {
storedMessages1 := *res1.Messages
require.Len(t, storedMessages1, 5, "Expected to retrieve exactly 5 messages from first query")
for i := 0; i < 5; i++ {
Debug("stored hashes round 2 iteration %i is %s", i, storedMessages1[i].MessageHash)
}
for i := 0; i < 5; i++ {
require.Equal(t, sentHashes[numMessages-1-i], storedMessages1[i].MessageHash, "Message order mismatch in first query")
require.Equal(t, sentHashes[i+3], storedMessages1[i].MessageHash, "Message order mismatch in first query")
}
Debug("Node3 querying second page of stored messages from Node2")
storeRequest2 := common.StoreQueryRequest{
IncludeData: true,
ContentTopics: &[]string{"test-content-topic"},
PaginationLimit: proto.Uint64(5),
PaginationLimit: proto.Uint64(3),
PaginationForward: false,
PaginationCursor: &res1.PaginationCursor,
TimeStart: queryTimestamp,
}
res2, err := node3.GetStoredMessages(node2, &storeRequest2)
@ -457,7 +472,8 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) {
require.Len(t, storedMessages2, 3, "Expected to retrieve exactly 3 messages from second query")
for i := 0; i < 3; i++ {
require.Equal(t, sentHashes[numMessages-6-i], storedMessages2[i].MessageHash, "Message order mismatch in second query")
require.Equal(t, sentHashes[i], storedMessages2[i].MessageHash, "Message order mismatch in second query")
}
Debug("Test successfully verified store query pagination in reverse order")
@ -698,7 +714,7 @@ func TestStoreQueryWithWrongContentTopic(t *testing.T) {
}
storedmsgs, _ := node3.GetStoredMessages(node2, storeQueryRequest)
require.Nil(t, (*storedmsgs.Messages)[0], "Expected no messages to be returned for incorrect content topic and timestamp")
require.Empty(t, (*storedmsgs.Messages), "Expected no messages to be returned for incorrect content topic and timestamp")
Debug("Test successfully verified that store query fails when using an incorrect content topic and an old timestamp")
}
@ -999,6 +1015,7 @@ func TestStoredMessagesWithDifferentPubsubTopics(t *testing.T) {
Debug("Node1 is publishing message on pubsub topic: %s", pubsubTopic)
node1.RelaySubscribe(pubsubTopic)
node2.RelaySubscribe(pubsubTopic)
time.Sleep(time.Second * 2)
queryTimestamp := proto.Int64(time.Now().UnixNano())
var msg = node1.CreateMessage()
msgHash, err := node1.RelayPublishNoCTX(pubsubTopic, msg)

521
waku/stress_test.go Normal file
View File

@ -0,0 +1,521 @@
//go:build !stress
// +build !stress
package waku
import (
"fmt"
"math/rand"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/waku-org/waku-go-bindings/utils"
"github.com/waku-org/waku-go-bindings/waku/common"
// "go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
)
func TestStressMemoryUsageForThreeNodes(t *testing.T) {
testName := t.Name()
var err error
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
Debug("[%s] Memory usage BEFORE creating nodes: %d KB", testName, memStats.HeapAlloc/1024)
node1Cfg := DefaultWakuConfig
node1Cfg.TcpPort, node1Cfg.Discv5UdpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
node2Cfg := DefaultWakuConfig
node2Cfg.TcpPort, node2Cfg.Discv5UdpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
node3Cfg := DefaultWakuConfig
node3Cfg.TcpPort, node3Cfg.Discv5UdpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err)
node1, err := NewWakuNode(&node1Cfg, "node1")
require.NoError(t, err)
node2, err := NewWakuNode(&node2Cfg, "node2")
require.NoError(t, err)
node3, err := NewWakuNode(&node3Cfg, "node3")
require.NoError(t, err)
runtime.ReadMemStats(&memStats)
Debug("[%s] Memory usage AFTER creating nodes: %d KB", testName, memStats.HeapAlloc/1024)
err = node1.Start()
require.NoError(t, err)
err = node2.Start()
require.NoError(t, err)
err = node3.Start()
require.NoError(t, err)
runtime.ReadMemStats(&memStats)
Debug("[%s] Memory usage AFTER starting nodes: %d KB", testName, memStats.HeapAlloc/1024)
time.Sleep(2 * time.Second)
node1.StopAndDestroy()
node2.StopAndDestroy()
node3.StopAndDestroy()
runtime.GC()
time.Sleep(1 * time.Second)
runtime.GC()
runtime.ReadMemStats(&memStats)
Debug("[%s] Memory usage AFTER destroying nodes: %d KB", testName, memStats.HeapAlloc/1024)
Debug("[%s] Test completed successfully", testName)
}
func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) {
Debug("Starting test")
runtime.GC()
nodeConfig := DefaultWakuConfig
nodeConfig.Relay = true
nodeConfig.Store = true
Debug("Creating 2 nodes")
wakuNode, err := StartWakuNode("node1", &nodeConfig)
require.NoError(t, err, "Failed to start Waku node")
node2, err := StartWakuNode("node2", &nodeConfig)
require.NoError(t, err, "Failed to start Waku node")
node2.ConnectPeer(wakuNode)
defer func() {
Debug("Stopping and destroying Waku node")
wakuNode.StopAndDestroy()
node2.StopAndDestroy()
}()
var memStats runtime.MemStats
iterations := 50
runtime.ReadMemStats(&memStats)
initialHeapAlloc := memStats.HeapAlloc
Debug("Initial memory usage check before publishing %d MB", initialHeapAlloc/1024/1024)
queryTimestamp := proto.Int64(time.Now().UnixNano())
for i := 0; i < iterations; i++ {
message := wakuNode.CreateMessage()
message.Payload = []byte(fmt.Sprintf("Test endurance message payload %d", i))
_, err := wakuNode.RelayPublishNoCTX(DefaultPubsubTopic, message)
require.NoError(t, err, "Failed to publish message")
if i%10 == 0 {
runtime.ReadMemStats(&memStats)
Debug("Memory usage at iteration %d: HeapAlloc=%v MB, NumGC=%v",
i, memStats.HeapAlloc/1024/1024, memStats.NumGC)
storeQueryRequest := &common.StoreQueryRequest{
TimeStart: queryTimestamp,
IncludeData: true,
PaginationLimit: proto.Uint64(50),
}
storedmsgs, err := wakuNode.GetStoredMessages(node2, storeQueryRequest)
require.NoError(t, err, "Failed to query store messages")
require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message")
}
}
runtime.ReadMemStats(&memStats)
finalHeapAlloc := memStats.HeapAlloc
Debug("Memory before test: %v KB, Memory after test: %v KB", initialHeapAlloc/1024, finalHeapAlloc/1024)
//require.LessOrEqual(t, finalHeapAlloc, initialHeapAlloc*2, "Memory usage has grown too much")
Debug("[%s] Test completed successfully", t.Name())
}
func TestStressHighThroughput10kPublish(t *testing.T) {
node1Cfg := DefaultWakuConfig
node1Cfg.Relay = true
node1, err := StartWakuNode("node1", &node1Cfg)
require.NoError(t, err, "Failed to start node1")
defer node1.StopAndDestroy()
node2Cfg := DefaultWakuConfig
node2Cfg.Relay = true
node2, err := StartWakuNode("node2", &node2Cfg)
require.NoError(t, err, "Failed to start node2")
defer node2.StopAndDestroy()
err = node1.ConnectPeer(node2)
require.NoError(t, err, "Failed to connect node1 to node2")
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
startHeapKB := memStats.HeapAlloc / 1024
startRSSKB, err := utils.GetRSSKB()
require.NoError(t, err, "Failed to read initial RSS")
Debug("Memory usage BEFORE sending => HeapAlloc: %d KB, RSS: %d KB", startHeapKB, startRSSKB)
totalMessages := 5000
pubsubTopic := DefaultPubsubTopic
for i := 0; i < totalMessages; i++ {
message := node1.CreateMessage()
message.Payload = []byte(fmt.Sprintf("High-throughput message #%d", i))
_, err := node1.RelayPublishNoCTX(pubsubTopic, message)
require.NoError(t, err, "Failed to publish message %d", i)
time.Sleep(1 * time.Second)
Debug("###Iteration number#%d", i)
}
runtime.ReadMemStats(&memStats)
endHeapKB := memStats.HeapAlloc / 1024
endRSSKB, err := utils.GetRSSKB()
require.NoError(t, err, "Failed to read final RSS")
Debug("Memory usage AFTER sending => HeapAlloc: %d KB, RSS: %d KB", endHeapKB, endRSSKB)
}
func TestStressConnectDisconnect500Iteration(t *testing.T) {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
startHeapKB := memStats.HeapAlloc / 1024
startRSSKB, err := utils.GetRSSKB()
require.NoError(t, err)
Debug("Before test: HeapAlloc = %d KB, RSS = %d KB", startHeapKB, startRSSKB)
node0Cfg := DefaultWakuConfig
node0Cfg.Relay = true
node0, err := StartWakuNode("node0", &node0Cfg)
require.NoError(t, err)
node1Cfg := DefaultWakuConfig
node1Cfg.Relay = true
node1, err := StartWakuNode("node1", &node1Cfg)
require.NoError(t, err)
defer func() {
node0.StopAndDestroy()
node1.StopAndDestroy()
}()
iterations := 200
for i := 1; i <= iterations; i++ {
err := node0.ConnectPeer(node1)
require.NoError(t, err, "Iteration %d: node0 failed to connect to node1", i)
time.Sleep(1 * time.Second)
count, err := node0.GetNumConnectedPeers()
require.NoError(t, err, "Iteration %d: failed to get peers for node0", i)
Debug("Iteration %d: node0 sees %d connected peers", i, count)
if count == 1 {
msg := node0.CreateMessage()
msg.Payload = []byte(fmt.Sprintf("Iteration %d: message from node0", i))
msgHash, err := node0.RelayPublishNoCTX(DefaultPubsubTopic, msg)
require.NoError(t, err, "Iteration %d: node0 failed to publish message", i)
Debug("Iteration %d: node0 published message with hash %s", i, msgHash.String())
}
err = node0.DisconnectPeer(node1)
require.NoError(t, err, "Iteration %d: node0 failed to disconnect from node1", i)
Debug("Iteration %d: node0 disconnected from node1", i)
time.Sleep(2 * time.Second)
}
runtime.ReadMemStats(&memStats)
endHeapKB := memStats.HeapAlloc / 1024
endRSSKB, err := utils.GetRSSKB()
require.NoError(t, err)
Debug("After test: HeapAlloc = %d KB, RSS = %d KB", endHeapKB, endRSSKB)
}
func TestStressRandomNodesInMesh(t *testing.T) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
minNodes := 5
maxNodes := 10
nodes := make([]*WakuNode, 0, maxNodes)
for i := 0; i < minNodes; i++ {
cfg := DefaultWakuConfig
cfg.Relay = true
n, err := StartWakuNode(fmt.Sprintf("node%d", i+1), &cfg)
require.NoError(t, err, "Failed to start initial node %d", i+1)
nodes = append(nodes, n)
}
err := ConnectAllPeers(nodes)
time.Sleep(1 * time.Second)
require.NoError(t, err, "Failed to connect initial nodes with ConnectAllPeers")
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
startHeapKB := memStats.HeapAlloc / 1024
startRSSKB, err2 := utils.GetRSSKB()
require.NoError(t, err2, "Failed to read initial RSS")
Debug("Memory at start of test: HeapAlloc=%d KB, RSS=%d KB", startHeapKB, startRSSKB)
testDuration := 30 * time.Minute
endTime := time.Now().Add(testDuration)
for time.Now().Before(endTime) {
action := r.Intn(2)
if action == 0 && len(nodes) < maxNodes {
i := len(nodes)
cfg := DefaultWakuConfig
cfg.Relay = true
newNode, err := StartWakuNode(fmt.Sprintf("node%d", i+1), &cfg)
if err == nil {
nodes = append(nodes, newNode)
err := ConnectAllPeers(nodes)
if err == nil {
Debug("Added node%d, now connecting all peers", i+1)
} else {
Debug("Failed to reconnect all peers after adding node%d: %v", i+1, err)
}
} else {
Debug("Failed to start new node: %v", err)
}
} else if action == 1 && len(nodes) > minNodes {
removeIndex := r.Intn(len(nodes))
toRemove := nodes[removeIndex]
nodes = append(nodes[:removeIndex], nodes[removeIndex+1:]...)
toRemove.StopAndDestroy()
Debug("Removed node %d from mesh", removeIndex)
if len(nodes) > 1 {
err := ConnectAllPeers(nodes)
if err == nil {
Debug("Reconnected all peers node %d", removeIndex)
} else {
Debug("Failed to reconnect all peers when removing node %d: %v", removeIndex, err)
}
}
}
time.Sleep(5 * time.Second)
for j, n := range nodes {
count, err := n.GetNumConnectedPeers()
if err != nil {
Debug("Node%d: error getting connected peers: %v", j+1, err)
} else {
Debug("Node%d sees %d connected peers", j+1, count)
}
}
time.Sleep(3 * time.Second)
}
for _, n := range nodes {
n.StopAndDestroy()
}
runtime.ReadMemStats(&memStats)
endHeapKB := memStats.HeapAlloc / 1024
endRSSKB, err3 := utils.GetRSSKB()
require.NoError(t, err3, "Failed to read final RSS")
Debug("Memory at end of test: HeapAlloc=%d KB, RSS=%d KB", endHeapKB, endRSSKB)
}
func TestStressLargePayloadEphemeralMessagesEndurance(t *testing.T) {
nodePubCfg := DefaultWakuConfig
nodePubCfg.Relay = true
publisher, err := StartWakuNode("publisher", &nodePubCfg)
require.NoError(t, err)
defer publisher.StopAndDestroy()
nodeRecvCfg := DefaultWakuConfig
nodeRecvCfg.Relay = true
receiver, err := StartWakuNode("receiver", &nodeRecvCfg)
require.NoError(t, err)
defer receiver.StopAndDestroy()
err = receiver.RelaySubscribe(DefaultPubsubTopic)
require.NoError(t, err)
err = publisher.ConnectPeer(receiver)
require.NoError(t, err)
time.Sleep(2 * time.Second)
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
startHeapKB := memStats.HeapAlloc / 1024
startRSSKB, err := utils.GetRSSKB()
require.NoError(t, err)
Debug("Before endurance test: HeapAlloc = %d KB, RSS = %d KB", startHeapKB, startRSSKB)
maxIterations := 2000
payloadSize := 100 * 1024
largePayload := make([]byte, payloadSize)
for i := range largePayload {
largePayload[i] = 'a'
}
var publishedMessages int
for i := 0; i < maxIterations; i++ {
msg := publisher.CreateMessage()
msg.Payload = largePayload
ephemeral := true
msg.Ephemeral = &ephemeral
_, err := publisher.RelayPublishNoCTX(DefaultPubsubTopic, msg)
if err == nil {
publishedMessages++
} else {
Error("Error publishing ephemeral message: %v", err)
}
time.Sleep(1 * time.Second)
Debug("###Iteration number %d", i+1)
}
runtime.ReadMemStats(&memStats)
endHeapKB := memStats.HeapAlloc / 1024
endRSSKB, err := utils.GetRSSKB()
require.NoError(t, err)
Debug("After endurance test: HeapAlloc = %d KB, RSS = %d KB", endHeapKB, endRSSKB)
}
func TestStress2Nodes500IterationTearDown(t *testing.T) {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
initialMem := memStats.HeapAlloc
Debug("[%s] Memory usage at test START: %d KB", t.Name(), initialMem/1024)
initialRSS, err := utils.GetRSSKB()
require.NoError(t, err)
Debug("[%s] OS-level RSS at test START: %d KB", t.Name(), initialRSS)
totalIterations := 500
for i := 1; i <= totalIterations; i++ {
var nodes []*WakuNode
for n := 1; n <= 2; n++ {
cfg := DefaultWakuConfig
cfg.Relay = true
cfg.Discv5Discovery = false
cfg.TcpPort, cfg.Discv5UdpPort, err = GetFreePortIfNeeded(0, 0)
require.NoError(t, err, "Failed to get free ports for node%d", n)
node, err := NewWakuNode(&cfg, fmt.Sprintf("node%d", n))
require.NoError(t, err, "Failed to create node%d", n)
err = node.Start()
require.NoError(t, err, "Failed to start node%d", n)
nodes = append(nodes, node)
}
err = ConnectAllPeers(nodes)
require.NoError(t, err)
message := nodes[0].CreateMessage()
msgHash, err := nodes[0].RelayPublishNoCTX(DefaultPubsubTopic, message)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
err = nodes[1].VerifyMessageReceived(message, msgHash, 500*time.Millisecond)
require.NoError(t, err, "Node1 did not receive message from node1")
for _, node := range nodes {
node.StopAndDestroy()
time.Sleep(50 * time.Millisecond)
}
runtime.GC()
time.Sleep(250 * time.Millisecond)
runtime.GC()
if i == 250 || i == 500 {
runtime.ReadMemStats(&memStats)
Debug("Iteration %d, usage after teardown: %d KB", i, memStats.HeapAlloc/1024)
require.LessOrEqual(t, memStats.HeapAlloc, initialMem*3, "Memory usage soared above threshold after iteration %d", i)
rssNow, err := utils.GetRSSKB()
require.NoError(t, err)
Debug("Iteration %d, OS-level RSS after teardown: %d KB", i, rssNow)
//require.LessOrEqual(t, rssNow, initialRSS*10, "OS-level RSS soared above threshold after iteration %d", i)
}
Debug("Iteration numberrrrrr %d", i)
}
runtime.GC()
time.Sleep(500 * time.Millisecond)
runtime.GC()
runtime.ReadMemStats(&memStats)
finalMem := memStats.HeapAlloc
Debug("[%s] Memory usage at test END: %d KB", t.Name(), finalMem/1024)
// require.LessOrEqual(t, finalMem, initialMem*3, "Memory usage soared above threshold after %d cycles", totalIterations)
finalRSS, err := utils.GetRSSKB()
require.NoError(t, err)
Debug("[%s] OS-level RSS at test END: %d KB", t.Name(), finalRSS)
//require.LessOrEqual(t, finalRSS, initialRSS*3, "OS-level RSS soared above threshold after %d cycles", totalIterations)
}
func TestPeerExchangePXLoad(t *testing.T) {
testName := "PeerExchangePXLoad"
pxServerCfg := DefaultWakuConfig
pxServerCfg.PeerExchange = true
pxServerCfg.Relay = true
pxServer, err := StartWakuNode("PXServer", &pxServerCfg)
require.NoError(t, err, "Failed to start PX server")
defer pxServer.StopAndDestroy()
relayA, err := StartWakuNode("RelayA", &DefaultWakuConfig)
require.NoError(t, err, "Failed to start RelayA")
defer relayA.StopAndDestroy()
relayB, err := StartWakuNode("RelayB", &DefaultWakuConfig)
require.NoError(t, err, "Failed to start RelayB")
defer relayB.StopAndDestroy()
err = pxServer.ConnectPeer(relayA)
require.NoError(t, err, "PXServer failed to connect RelayA")
err = pxServer.ConnectPeer(relayB)
require.NoError(t, err, "PXServer failed to connect RelayB")
time.Sleep(2 * time.Second)
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
startHeapKB := memStats.HeapAlloc / 1024
startRSSKB, err := utils.GetRSSKB()
require.NoError(t, err, "Failed to get initial RSS")
Debug("%s: Before test: HeapAlloc=%d KB, RSS=%d KB", testName, startHeapKB, startRSSKB)
// Save the initial memory reading to CSV
err = recordMemoryMetricsPX(testName, "start", startHeapKB, startRSSKB)
require.NoError(t, err, "Failed to record start metrics")
testDuration := 30 * time.Minute
endTime := time.Now().Add(testDuration)
lastPublishTime := time.Now().Add(-5 * time.Second) // so first publish is immediate
for time.Now().Before(endTime) {
// Publish a message from the PX server every 5 seconds
if time.Since(lastPublishTime) >= 5*time.Second {
msg := pxServer.CreateMessage()
msg.Payload = []byte("PX server message stream")
_, _ = pxServer.RelayPublishNoCTX(DefaultPubsubTopic, msg)
lastPublishTime = time.Now()
}
// Create a light node that relies on PX, run for 3s
lightCfg := DefaultWakuConfig
lightCfg.Relay = false
lightCfg.Store = false
lightCfg.PeerExchange = true
lightNode, err := StartWakuNode("LightNode", &lightCfg)
if err == nil {
errPX := lightNode.ConnectPeer(pxServer)
if errPX == nil {
// Request peers from PX server
_, _ = lightNode.PeerExchangeRequest(2)
}
time.Sleep(3 * time.Second)
lightNode.StopAndDestroy()
} else {
Debug("Failed to start light node: %v", err)
}
time.Sleep(1 * time.Second)
}
runtime.ReadMemStats(&memStats)
endHeapKB := memStats.HeapAlloc / 1024
endRSSKB, err := utils.GetRSSKB()
require.NoError(t, err, "Failed to get final RSS")
Debug("Memory %s: After test: HeapAlloc=%d KB, RSS=%d KB", testName, endHeapKB, endRSSKB)
// Save the final memory reading to CSV
err = recordMemoryMetricsPX(testName, "end", endHeapKB, endRSSKB)
require.NoError(t, err, "Failed to record end metrics")
}