diff --git a/examples/chat2-reliable/chat.go b/examples/chat2-reliable/chat.go index 1466431b..689a4a46 100644 --- a/examples/chat2-reliable/chat.go +++ b/examples/chat2-reliable/chat.go @@ -172,6 +172,9 @@ func (c *Chat) receiveMessages() { func (c *Chat) parseInput() { defer c.wg.Done() + + var disconnectedPeers []peer.ID + for { select { case <-c.ctx.Done(): @@ -267,10 +270,32 @@ func (c *Chat) parseInput() { /connect multiaddress - dials a node adding it to the list of connected peers /peers - list of peers connected to this node /nick newNick - change the user's nickname + /disconnect - disconnect from all currently connected peers + /reconnect - attempt to reconnect to previously disconnected peers /exit - closes the app`) return } + // Disconnect from peers + if line == "/disconnect" { + disconnectedPeers = c.disconnectFromPeers() + c.ui.InfoMessage("Disconnected from all peers. Use /reconnect to reconnect.") + return + } + + // Reconnect to peers + if line == "/reconnect" { + if len(disconnectedPeers) == 0 { + c.ui.InfoMessage("No disconnection active. Use /disconnect first.") + } else { + c.reconnectToPeers(disconnectedPeers) + disconnectedPeers = nil + c.ui.InfoMessage("Reconnection initiated.") + } + return + } + + // If no command matched, send as a regular message c.SendMessage(line) }() } @@ -521,6 +546,29 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) { } } +func (c *Chat) disconnectFromPeers() []peer.ID { + disconnectedPeers := c.node.Host().Network().Peers() + for _, peerID := range disconnectedPeers { + c.node.Host().Network().ClosePeer(peerID) + } + return disconnectedPeers +} + +func (c *Chat) reconnectToPeers(peers []peer.ID) { + for _, peerID := range peers { + // We're using a goroutine here to avoid blocking if a peer is unreachable + go func(p peer.ID) { + ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second) + defer cancel() + if _, err := c.node.Host().Network().DialPeer(ctx, p); err != nil { + c.ui.ErrorMessage(fmt.Errorf("failed to reconnect to peer %s: %w", p, err)) + } else { + c.ui.InfoMessage(fmt.Sprintf("Successfully reconnected to peer %s", p)) + } + }(peerID) + } +} + func generateUniqueID() string { return uuid.New().String() } diff --git a/examples/chat2-reliable/chat_reliability_test.go b/examples/chat2-reliable/chat_reliability_test.go index 510fd121..86d9c098 100644 --- a/examples/chat2-reliable/chat_reliability_test.go +++ b/examples/chat2-reliable/chat_reliability_test.go @@ -141,7 +141,7 @@ func TestLamportTimestamps(t *testing.T) { return true }, 30*time.Second, 1*time.Second, "Message propagation failed") - assert.Equal(t, int32(1), env.chats[0].getLamportTimestamp(), "Sender's Lamport timestamp should be 1") + assert.Greater(t, env.chats[0].getLamportTimestamp(), int32(0), "Sender's Lamport timestamp should be greater than 0") assert.Greater(t, env.chats[1].getLamportTimestamp(), int32(0), "Node 1's Lamport timestamp should be greater than 0") assert.Greater(t, env.chats[2].getLamportTimestamp(), int32(0), "Node 2's Lamport timestamp should be greater than 0") diff --git a/examples/chat2-reliable/reliability.go b/examples/chat2-reliable/reliability.go index 8fa821a8..9e8aa832 100644 --- a/examples/chat2-reliable/reliability.go +++ b/examples/chat2-reliable/reliability.go @@ -5,6 +5,9 @@ import ( "context" "errors" "fmt" + "log" + "os" + "strings" "time" "github.com/bits-and-blooms/bloom/v3" @@ -18,15 +21,17 @@ const ( bufferSweepInterval = 5 * time.Second syncMessageInterval = 30 * time.Second messageAckTimeout = 60 * time.Second - maxRetries = 3 - retryBaseDelay = 1 * time.Second - maxRetryDelay = 10 * time.Second + maxRetries = 5 + retryBaseDelay = 3 * time.Second + maxRetryDelay = 30 * time.Second ackTimeout = 5 * time.Second maxResendAttempts = 5 resendBaseDelay = 1 * time.Second maxResendDelay = 30 * time.Second ) +var reliabilityLogger *log.Logger + func (c *Chat) initReliabilityProtocol() { c.wg.Add(4) c.setupMessageRequestHandler() @@ -37,6 +42,18 @@ func (c *Chat) initReliabilityProtocol() { go c.startEagerPushMechanism() } +func init() { + file, err := os.OpenFile("reliability.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Fatal(err) + } + reliabilityLogger = log.New(file, "", log.LstdFlags) +} + +func (c *Chat) logReliabilityEvent(message string) { + reliabilityLogger.Println(message) +} + func (c *Chat) startEagerPushMechanism() { defer c.wg.Done() @@ -140,6 +157,7 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) { c.ui.ChatMessage(int64(c.getLamportTimestamp()), msg.SenderId, msg.Content) // Add to message history c.addToMessageHistory(msg) + c.logReliabilityEvent(fmt.Sprintf("Processed message %s with Lamport timestamp %d", msg.MessageId, msg.LamportTimestamp)) } // Process any messages in the buffer that now have their dependencies met @@ -151,6 +169,7 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) { } // Add to incoming buffer c.addToIncomingBuffer(msg) + c.logReliabilityEvent(fmt.Sprintf("Message %s buffered due to missing dependencies: %v", msg.MessageId, missingDeps)) } } @@ -218,6 +237,7 @@ func (c *Chat) requestMissingMessage(messageID string) { missedMsg, err := c.doRequestMissingMessageFromPeers(messageID) if err == nil { c.processReceivedMessage(missedMsg) + c.logReliabilityEvent(fmt.Sprintf("Successfully retrieved missing message %s", messageID)) return } @@ -229,7 +249,7 @@ func (c *Chat) requestMissingMessage(messageID string) { time.Sleep(delay) } - c.ui.ErrorMessage(fmt.Errorf("failed to retrieve missing message %s after %d attempts", messageID, maxRetries)) + c.logReliabilityEvent(fmt.Sprintf("Failed to retrieve missing message %s after %d attempts", messageID, maxRetries)) } func (c *Chat) checkCausalDependencies(msg *pb.Message) []string { @@ -279,6 +299,21 @@ func (c *Chat) addToMessageHistory(msg *pb.Message) { if len(c.messageHistory) > maxMessageHistory { c.messageHistory = c.messageHistory[len(c.messageHistory)-maxMessageHistory:] } + + c.logReliabilityEvent(fmt.Sprintf("Added message %s to history at position %d with Lamport timestamp %d", msg.MessageId, insertIndex, msg.LamportTimestamp)) + + // Log the entire message history + c.logMessageHistory() +} + +func (c *Chat) logMessageHistory() { + var historyLog strings.Builder + historyLog.WriteString("Current Message History:\n") + for i, msg := range c.messageHistory { + historyLog.WriteString(fmt.Sprintf("%d. MessageID: %s, Sender: %s, Lamport: %d, Content: %s\n", + i+1, msg.MessageId, msg.SenderId, msg.LamportTimestamp, msg.Content)) + } + c.logReliabilityEvent(historyLog.String()) } func (c *Chat) periodicBufferSweep() { @@ -395,18 +430,18 @@ func (c *Chat) addToIncomingBuffer(msg *pb.Message) { c.incomingBuffer = append(c.incomingBuffer, msg) } -func (c *Chat) incLamportTimestamp() { +func (c *Chat) incLamportTimestamp() int32 { c.lamportTSMutex.Lock() defer c.lamportTSMutex.Unlock() - c.lamportTimestamp++ + now := int32(time.Now().Unix()) + c.lamportTimestamp = max32(now, c.lamportTimestamp+1) + return c.lamportTimestamp } func (c *Chat) updateLamportTimestamp(msgTs int32) { c.lamportTSMutex.Lock() defer c.lamportTSMutex.Unlock() - if msgTs > c.lamportTimestamp { - c.lamportTimestamp = msgTs - } + c.lamportTimestamp = max32(msgTs, c.lamportTimestamp) } func (c *Chat) getLamportTimestamp() int32 { @@ -414,3 +449,10 @@ func (c *Chat) getLamportTimestamp() int32 { defer c.lamportTSMutex.Unlock() return c.lamportTimestamp } + +func max32(a, b int32) int32 { + if a > b { + return a + } + return b +} \ No newline at end of file