2024-08-19 09:30:15 +00:00
package main
import (
"chat2-reliable/pb"
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type TestEnvironment struct {
nodes [ ] * node . WakuNode
chats [ ] * Chat
}
func setupTestEnvironment ( ctx context . Context , t * testing . T , nodeCount int ) ( * TestEnvironment , error ) {
t . Logf ( "Setting up test environment with %d nodes" , nodeCount )
env := & TestEnvironment {
nodes : make ( [ ] * node . WakuNode , nodeCount ) ,
chats : make ( [ ] * Chat , nodeCount ) ,
}
for i := 0 ; i < nodeCount ; i ++ {
node , err := setupTestNode ( ctx , t )
if err != nil {
return nil , fmt . Errorf ( "failed to set up node %d: %w" , i , err )
}
env . nodes [ i ] = node
chat , err := setupTestChat ( ctx , node , fmt . Sprintf ( "Node%d" , i ) )
if err != nil {
return nil , fmt . Errorf ( "failed to set up chat for node %d: %w" , i , err )
}
env . chats [ i ] = chat
}
t . Log ( "Connecting nodes in ring topology" )
for i := 0 ; i < nodeCount ; i ++ {
nextIndex := ( i + 1 ) % nodeCount
_ , err := env . nodes [ i ] . AddPeer ( env . nodes [ nextIndex ] . ListenAddresses ( ) [ 0 ] , peerstore . Static , env . chats [ i ] . options . Relay . Topics . Value ( ) )
if err != nil {
return nil , fmt . Errorf ( "failed to connect node %d to node %d: %w" , i , nextIndex , err )
}
}
t . Log ( "Test environment setup complete" )
return env , nil
}
func setupTestNode ( ctx context . Context , t * testing . T ) ( * node . WakuNode , error ) {
opts := [ ] node . WakuNodeOption {
node . WithWakuRelay ( ) ,
// node.WithWakuStore(),
}
node , err := node . New ( opts ... )
if err != nil {
return nil , err
}
if err := node . Start ( ctx ) ; err != nil {
return nil , err
}
// if node.Store() == nil {
// t.Logf("Store protocol is not enabled on node %d", index)
// }
return node , nil
}
type PeerConnection = node . PeerConnection
func setupTestChat ( ctx context . Context , node * node . WakuNode , nickname string ) ( * Chat , error ) {
topics := cli . StringSlice { }
topics . Set ( relay . DefaultWakuTopic )
options := Options {
Nickname : nickname ,
ContentTopic : "/test/1/chat/proto" ,
Relay : RelayOptions {
Enable : true ,
Topics : topics ,
} ,
}
// Create a channel of the correct type
connNotifier := make ( chan PeerConnection )
chat := NewChat ( ctx , node , connNotifier , options )
if chat == nil {
return nil , fmt . Errorf ( "failed to create chat instance" )
}
return chat , nil
}
func areNodesConnected ( nodes [ ] * node . WakuNode , expectedPeers int ) bool {
for _ , node := range nodes {
if len ( node . Host ( ) . Network ( ) . Peers ( ) ) != expectedPeers {
return false
}
}
return true
}
// TestLamportTimestamps verifies that Lamport timestamps are correctly updated
func TestLamportTimestamps ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 2 * time . Minute )
defer cancel ( )
t . Log ( "Starting TestLamportTimestamps" )
nodeCount := 3
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
require . Eventually ( t , func ( ) bool {
return areNodesConnected ( env . nodes , 2 )
} , 30 * time . Second , 1 * time . Second , "Nodes failed to connect" )
for i , chat := range env . chats {
t . Logf ( "Node %d initial Lamport timestamp: %d" , i , chat . getLamportTimestamp ( ) )
}
t . Log ( "Sending message from Node 0" )
env . chats [ 0 ] . SendMessage ( "Message from Node 0" )
t . Log ( "Waiting for message propagation" )
require . Eventually ( t , func ( ) bool {
for _ , chat := range env . chats {
if chat . getLamportTimestamp ( ) == 0 {
return false
}
}
return true
} , 30 * time . Second , 1 * time . Second , "Message propagation failed" )
2024-09-12 09:53:57 +00:00
assert . Greater ( t , env . chats [ 0 ] . getLamportTimestamp ( ) , int32 ( 0 ) , "Sender's Lamport timestamp should be greater than 0" )
2024-08-19 09:30:15 +00:00
assert . Greater ( t , env . chats [ 1 ] . getLamportTimestamp ( ) , int32 ( 0 ) , "Node 1's Lamport timestamp should be greater than 0" )
assert . Greater ( t , env . chats [ 2 ] . getLamportTimestamp ( ) , int32 ( 0 ) , "Node 2's Lamport timestamp should be greater than 0" )
assert . NotEmpty ( t , env . chats [ 1 ] . messageHistory , "Node 1 should have received the message" )
assert . NotEmpty ( t , env . chats [ 2 ] . messageHistory , "Node 2 should have received the message" )
if len ( env . chats [ 1 ] . messageHistory ) > 0 {
assert . Equal ( t , "Message from Node 0" , env . chats [ 1 ] . messageHistory [ 0 ] . Content , "Node 1 should have received the correct message" )
}
if len ( env . chats [ 2 ] . messageHistory ) > 0 {
assert . Equal ( t , "Message from Node 0" , env . chats [ 2 ] . messageHistory [ 0 ] . Content , "Node 2 should have received the correct message" )
}
t . Log ( "TestLamportTimestamps completed successfully" )
}
// TestCausalOrdering ensures messages are processed in the correct causal order
func TestCausalOrdering ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 2 * time . Minute )
defer cancel ( )
t . Log ( "Starting TestCausalOrdering" )
nodeCount := 3
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
require . Eventually ( t , func ( ) bool {
return areNodesConnected ( env . nodes , 2 )
} , 30 * time . Second , 1 * time . Second , "Nodes failed to connect" )
t . Log ( "Sending messages from different nodes" )
env . chats [ 0 ] . SendMessage ( "Message 1 from Node 0" )
time . Sleep ( 100 * time . Millisecond )
env . chats [ 1 ] . SendMessage ( "Message 2 from Node 1" )
time . Sleep ( 100 * time . Millisecond )
env . chats [ 2 ] . SendMessage ( "Message 3 from Node 2" )
time . Sleep ( 100 * time . Millisecond )
t . Log ( "Waiting for message propagation" )
require . Eventually ( t , func ( ) bool {
for i , chat := range env . chats {
t . Logf ( "Node %d message history length: %d" , i , len ( chat . messageHistory ) )
if len ( chat . messageHistory ) != 3 {
return false
}
}
return true
} , 30 * time . Second , 1 * time . Second , "Messages did not propagate to all nodes" )
for i , chat := range env . chats {
assert . Len ( t , chat . messageHistory , 3 , "Node %d should have 3 messages" , i )
assert . Equal ( t , "Message 1 from Node 0" , chat . messageHistory [ 0 ] . Content , "Node %d: First message incorrect" , i )
assert . Equal ( t , "Message 2 from Node 1" , chat . messageHistory [ 1 ] . Content , "Node %d: Second message incorrect" , i )
assert . Equal ( t , "Message 3 from Node 2" , chat . messageHistory [ 2 ] . Content , "Node %d: Third message incorrect" , i )
}
t . Log ( "TestCausalOrdering completed successfully" )
}
func TestBloomFilterDuplicateDetection ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 2 * time . Minute )
defer cancel ( )
t . Log ( "Starting TestBloomFilterDuplicateDetection" )
nodeCount := 2
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
require . Eventually ( t , func ( ) bool {
return areNodesConnected ( env . nodes , 1 )
} , 30 * time . Second , 1 * time . Second , "Nodes failed to connect" )
t . Log ( "Sending a message" )
testMessage := "Test message"
env . chats [ 0 ] . SendMessage ( testMessage )
t . Log ( "Waiting for message propagation" )
var receivedMsg * pb . Message
require . Eventually ( t , func ( ) bool {
if len ( env . chats [ 1 ] . messageHistory ) == 1 {
receivedMsg = env . chats [ 1 ] . messageHistory [ 0 ]
return true
}
return false
} , 30 * time . Second , 1 * time . Second , "Message did not propagate to second node" )
require . NotNil ( t , receivedMsg , "Received message should not be nil" )
t . Log ( "Simulating receiving the same message again" )
// Create a duplicate message
duplicateMsg := & pb . Message {
SenderId : receivedMsg . SenderId ,
MessageId : receivedMsg . MessageId , // Use the same MessageId to simulate a true duplicate
LamportTimestamp : receivedMsg . LamportTimestamp ,
CausalHistory : receivedMsg . CausalHistory ,
ChannelId : receivedMsg . ChannelId ,
BloomFilter : receivedMsg . BloomFilter ,
Content : receivedMsg . Content ,
}
env . chats [ 1 ] . processReceivedMessage ( duplicateMsg )
assert . Len ( t , env . chats [ 1 ] . messageHistory , 1 , "Node 1 should still have only one message (no duplicates)" )
t . Log ( "TestBloomFilterDuplicateDetection completed successfully" )
}
// TestNetworkPartition ensures that missing messages can be recovered
func TestNetworkPartition ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 3 * time . Minute )
defer cancel ( )
t . Log ( "Starting TestMessageRecovery" )
nodeCount := 3
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
nc := NewNetworkController ( ctx , env . nodes , env . chats )
require . Eventually ( t , func ( ) bool {
return areNodesConnected ( env . nodes , 2 )
} , 60 * time . Second , 1 * time . Second , "Nodes failed to connect" )
t . Log ( "Stage 1: Sending initial messages" )
env . chats [ 0 ] . SendMessage ( "Message 1" )
time . Sleep ( 100 * time . Millisecond )
env . chats [ 1 ] . SendMessage ( "Message 2" )
time . Sleep ( 100 * time . Millisecond )
t . Log ( "Waiting for message propagation" )
require . Eventually ( t , func ( ) bool {
for _ , chat := range env . chats {
if len ( chat . messageHistory ) != 2 {
return false
}
}
return true
} , 30 * time . Second , 1 * time . Second , "Messages did not propagate to all nodes" )
// Verify that Node 2 has messages before disconnection
require . Equal ( t , 2 , len ( env . chats [ 2 ] . messageHistory ) , "Node 2 does not have all messages" )
t . Log ( "Stage 2: Simulating network partition for Node 2" )
nc . DisconnectNode ( env . nodes [ 2 ] )
time . Sleep ( 1 * time . Second ) // Allow time for disconnection to take effect
t . Log ( "Stage 3: Sending message that Node 2 will miss" )
env . chats [ 0 ] . SendMessage ( "Missed Message" )
time . Sleep ( 100 * time . Millisecond )
t . Log ( "Stage 4: Reconnecting Node 2" )
nc . ReconnectNode ( env . nodes [ 2 ] )
time . Sleep ( 5 * time . Second ) // Allow time for reconnection to take effect
// Verify that Node 2 didn't receive the message
require . Equal ( t , 2 , len ( env . chats [ 2 ] . messageHistory ) , "Node 2 should not have received the missed message" )
t . Log ( "Stage 5: Sending a new message that depends on the missed message" )
env . chats [ 1 ] . SendMessage ( "New Message" )
// Verify that Node 2 received the new message
require . Eventually ( t , func ( ) bool {
msgCount := len ( env . chats [ 2 ] . messageHistory )
return msgCount >= 3
} , 30 * time . Second , 5 * time . Second , "Node 2 should have received the new message" )
// Stage 6: Wait for message recovery
t . Log ( "Stage 6: Waiting for message recovery" )
require . Eventually ( t , func ( ) bool {
msgCount := len ( env . chats [ 2 ] . messageHistory )
return msgCount == 4
} , 30 * time . Second , 5 * time . Second , "Message recovery failed" )
// Print final message history for all nodes
for i , chat := range env . chats {
t . Logf ( "Node %d final message history:" , i )
for j , msg := range chat . messageHistory {
t . Logf ( " Message %d: %s" , j + 1 , msg . Content )
}
}
// Verify the results
for i , msg := range env . chats [ 2 ] . messageHistory {
t . Logf ( "Message %d: %s" , i + 1 , msg . Content )
}
assert . Equal ( t , "Message 1" , env . chats [ 2 ] . messageHistory [ 0 ] . Content , "First message incorrect" )
assert . Equal ( t , "Message 2" , env . chats [ 2 ] . messageHistory [ 1 ] . Content , "Second message incorrect" )
assert . Equal ( t , "Missed Message" , env . chats [ 2 ] . messageHistory [ 2 ] . Content , "Missed message not recovered" )
assert . Equal ( t , "New Message" , env . chats [ 2 ] . messageHistory [ 3 ] . Content , "New message incorrect" )
}
func TestConcurrentMessageSending ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 3 * time . Minute )
defer cancel ( )
t . Log ( "Starting TestConcurrentMessageSending" )
nodeCount := 5
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
require . Eventually ( t , func ( ) bool {
return areNodesConnected ( env . nodes , 2 )
} , 60 * time . Second , 3 * time . Second , "Nodes failed to connect" )
messageCount := 10
var wg sync . WaitGroup
t . Log ( "Sending messages concurrently" )
for i := 0 ; i < len ( env . chats ) ; i ++ {
wg . Add ( 1 )
go func ( index int ) {
defer wg . Done ( )
for j := 0 ; j < messageCount ; j ++ {
env . chats [ index ] . SendMessage ( fmt . Sprintf ( "Message %d from Node %d" , j , index ) )
time . Sleep ( 10 * time . Millisecond )
}
} ( i )
}
wg . Wait ( )
t . Log ( "Waiting for message propagation" )
totalExpectedMessages := len ( env . chats ) * messageCount
require . Eventually ( t , func ( ) bool {
for _ , chat := range env . chats {
if len ( chat . messageHistory ) != totalExpectedMessages {
return false
}
}
return true
} , 2 * time . Minute , 1 * time . Second , "Messages did not propagate to all nodes" )
for i , chat := range env . chats {
assert . Len ( t , chat . messageHistory , totalExpectedMessages , "Node %d should have received all messages" , i )
}
t . Log ( "TestConcurrentMessageSending completed successfully" )
}
func TestLargeGroupScaling ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Minute )
defer cancel ( )
t . Log ( "Starting TestLargeGroupScaling" )
nodeCount := 20
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
require . Eventually ( t , func ( ) bool {
return areNodesConnected ( env . nodes , 2 )
} , 2 * time . Minute , 3 * time . Second , "Nodes failed to connect" )
// Send a message from the first node
env . chats [ 0 ] . SendMessage ( "Broadcast message to large group" )
// Allow time for propagation
time . Sleep ( time . Duration ( nodeCount * 100 ) * time . Millisecond )
// Verify all nodes received the message
for i , chat := range env . chats {
assert . Len ( t , chat . messageHistory , 1 , "Node %d should have received the broadcast message" , i )
assert . Equal ( t , "Broadcast message to large group" , chat . messageHistory [ 0 ] . Content )
}
t . Log ( "TestLargeGroupScaling completed successfully" )
}
func TestEagerPushMechanism ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 2 * time . Minute )
defer cancel ( )
nodeCount := 2
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
nc := NewNetworkController ( ctx , env . nodes , env . chats )
// Disconnect node 1
nc . DisconnectNode ( env . nodes [ 1 ] )
// Send a message from node 0
env . chats [ 0 ] . SendMessage ( "Test eager push" )
// Wait for the message to be added to the outgoing buffer
time . Sleep ( 1 * time . Second )
// Reconnect node 1
nc . ReconnectNode ( env . nodes [ 1 ] )
// Wait for eager push to resend the message
time . Sleep ( 5 * time . Second )
// Check if node 1 received the message
assert . Eventually ( t , func ( ) bool {
return len ( env . chats [ 1 ] . messageHistory ) == 1
} , 10 * time . Second , 1 * time . Second , "Node 1 should have received the message via eager push" )
}
func TestBloomFilterWindow ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 2 * time . Minute )
defer cancel ( )
nodeCount := 2
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
// Reduce bloom filter window for testing
for _ , chat := range env . chats {
chat . bloomFilter . window = 2 * time . Second
}
// Send a message
env . chats [ 0 ] . SendMessage ( "Test bloom filter window" )
messageID := env . chats [ 0 ] . messageHistory [ 0 ] . MessageId
// Check if the message is in the bloom filter
assert . Eventually ( t , func ( ) bool {
return env . chats [ 1 ] . bloomFilter . Test ( messageID )
} , 30 * time . Second , 1 * time . Second , "Message should be in the bloom filter" )
// Wait for the bloom filter window to pass
time . Sleep ( 3 * time . Second )
// Clean the bloom filter
env . chats [ 1 ] . bloomFilter . Clean ( )
time . Sleep ( 3 * time . Second )
// Check if the message is no longer in the bloom filter
assert . False ( t , env . chats [ 1 ] . bloomFilter . Test ( messageID ) , "Message should no longer be in the bloom filter" )
// Send another message to ensure the filter still works for new messages
env . chats [ 0 ] . SendMessage ( "New test message" )
time . Sleep ( 1 * time . Second )
newMessageID := env . chats [ 0 ] . messageHistory [ 1 ] . MessageId
// Check if the new message is in the bloom filter
assert . Eventually ( t , func ( ) bool {
return env . chats [ 1 ] . bloomFilter . Test ( newMessageID )
} , 30 * time . Second , 1 * time . Second , "New message should be in the bloom filter" )
}
func TestConflictResolution ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 2 * time . Minute )
defer cancel ( )
nodeCount := 3
env , err := setupTestEnvironment ( ctx , t , nodeCount )
require . NoError ( t , err , "Failed to set up test environment" )
// Create conflicting messages with the same Lamport timestamp
conflictingMsg1 := & pb . Message {
SenderId : "Node0" ,
MessageId : "msg1" ,
LamportTimestamp : 1 ,
Content : "Conflict 1" ,
}
conflictingMsg2 := & pb . Message {
SenderId : "Node1" ,
MessageId : "msg2" ,
LamportTimestamp : 1 ,
Content : "Conflict 2" ,
}
// Process the conflicting messages in different orders on different nodes
env . chats [ 0 ] . processReceivedMessage ( conflictingMsg1 )
env . chats [ 0 ] . processReceivedMessage ( conflictingMsg2 )
env . chats [ 1 ] . processReceivedMessage ( conflictingMsg2 )
env . chats [ 1 ] . processReceivedMessage ( conflictingMsg1 )
// Check if the messages are ordered consistently across nodes
assert . Equal ( t , env . chats [ 0 ] . messageHistory [ 0 ] . MessageId , env . chats [ 1 ] . messageHistory [ 0 ] . MessageId , "Conflicting messages should be ordered consistently" )
assert . Equal ( t , env . chats [ 0 ] . messageHistory [ 1 ] . MessageId , env . chats [ 1 ] . messageHistory [ 1 ] . MessageId , "Conflicting messages should be ordered consistently" )
}
func TestNewNodeSyncAndMessagePropagation ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Minute )
defer cancel ( )
t . Log ( "Starting TestNewNodeSyncAndMessagePropagation" )
// Set up initial network with 2 nodes
initialNodeCount := 2
env , err := setupTestEnvironment ( ctx , t , initialNodeCount )
require . NoError ( t , err , "Failed to set up initial test environment" )
// Ensure initial nodes are connected
require . Eventually ( t , func ( ) bool {
return areNodesConnected ( env . nodes , 1 )
} , 60 * time . Second , 1 * time . Second , "Initial nodes failed to connect" )
t . Log ( "Sending initial messages" )
env . chats [ 0 ] . SendMessage ( "Initial message 1" )
env . chats [ 1 ] . SendMessage ( "Initial message 2" )
// Wait for message propagation
time . Sleep ( 5 * time . Second )
// Verify initial messages are received by both nodes
for i , chat := range env . chats {
assert . Len ( t , chat . messageHistory , 2 , "Node %d should have 2 initial messages" , i )
}
t . Log ( "Adding new node to the network" )
newNode , err := setupTestNode ( ctx , t )
require . NoError ( t , err , "Failed to set up new node" )
newChat , err := setupTestChat ( ctx , newNode , "NewNode" )
require . NoError ( t , err , "Failed to set up new chat" )
env . nodes = append ( env . nodes , newNode )
env . chats = append ( env . chats , newChat )
// Connect new node to the network
_ , err = env . nodes [ 2 ] . AddPeer ( env . nodes [ 0 ] . ListenAddresses ( ) [ 0 ] , peerstore . Static , env . chats [ 2 ] . options . Relay . Topics . Value ( ) )
require . NoError ( t , err , "Failed to connect new node to the network" )
t . Log ( "Waiting for new node to sync" )
require . Eventually ( t , func ( ) bool {
msgCount := len ( env . chats [ 2 ] . messageHistory )
return msgCount == 2
} , 1 * time . Minute , 5 * time . Second , "New node failed to sync message history" )
t . Log ( "Sending message from old node" )
env . chats [ 0 ] . SendMessage ( "Message from old node" )
// Wait for message propagation
time . Sleep ( 10 * time . Second )
// Verify the message is received by all nodes
for i , chat := range env . chats {
assert . Len ( t , chat . messageHistory , 3 , "Node %d should have 3 messages" , i )
}
t . Log ( "Sending message from new node" )
env . chats [ 2 ] . SendMessage ( "Message from new node" )
// Wait for message propagation
time . Sleep ( 10 * time . Second )
// Verify the message from new node is received by all nodes
for i , chat := range env . chats {
assert . Len ( t , chat . messageHistory , 4 , "Node %d should have 4 messages" , i )
}
for i := 0 ; i < 3 ; i ++ {
lastMsg := env . chats [ i ] . messageHistory [ len ( env . chats [ i ] . messageHistory ) - 1 ]
assert . Equal ( t , "Message from new node" , lastMsg . Content , "The last message is incorrect for node %d" , i )
}
t . Log ( "TestNewNodeSyncAndMessagePropagation completed" )
}