feat: e2e rel poc - reconnection, new lamport Ts, logging (#1220)

This commit is contained in:
Akhil 2024-09-12 13:53:57 +04:00 committed by GitHub
parent 2b61569558
commit bc2444ca46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 100 additions and 10 deletions

View File

@ -172,6 +172,9 @@ func (c *Chat) receiveMessages() {
func (c *Chat) parseInput() { func (c *Chat) parseInput() {
defer c.wg.Done() defer c.wg.Done()
var disconnectedPeers []peer.ID
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
@ -267,10 +270,32 @@ func (c *Chat) parseInput() {
/connect multiaddress - dials a node adding it to the list of connected peers /connect multiaddress - dials a node adding it to the list of connected peers
/peers - list of peers connected to this node /peers - list of peers connected to this node
/nick newNick - change the user's nickname /nick newNick - change the user's nickname
/disconnect - disconnect from all currently connected peers
/reconnect - attempt to reconnect to previously disconnected peers
/exit - closes the app`) /exit - closes the app`)
return return
} }
// Disconnect from peers
if line == "/disconnect" {
disconnectedPeers = c.disconnectFromPeers()
c.ui.InfoMessage("Disconnected from all peers. Use /reconnect to reconnect.")
return
}
// Reconnect to peers
if line == "/reconnect" {
if len(disconnectedPeers) == 0 {
c.ui.InfoMessage("No disconnection active. Use /disconnect first.")
} else {
c.reconnectToPeers(disconnectedPeers)
disconnectedPeers = nil
c.ui.InfoMessage("Reconnection initiated.")
}
return
}
// If no command matched, send as a regular message
c.SendMessage(line) c.SendMessage(line)
}() }()
} }
@ -521,6 +546,29 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
} }
} }
func (c *Chat) disconnectFromPeers() []peer.ID {
disconnectedPeers := c.node.Host().Network().Peers()
for _, peerID := range disconnectedPeers {
c.node.Host().Network().ClosePeer(peerID)
}
return disconnectedPeers
}
func (c *Chat) reconnectToPeers(peers []peer.ID) {
for _, peerID := range peers {
// We're using a goroutine here to avoid blocking if a peer is unreachable
go func(p peer.ID) {
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()
if _, err := c.node.Host().Network().DialPeer(ctx, p); err != nil {
c.ui.ErrorMessage(fmt.Errorf("failed to reconnect to peer %s: %w", p, err))
} else {
c.ui.InfoMessage(fmt.Sprintf("Successfully reconnected to peer %s", p))
}
}(peerID)
}
}
func generateUniqueID() string { func generateUniqueID() string {
return uuid.New().String() return uuid.New().String()
} }

View File

@ -141,7 +141,7 @@ func TestLamportTimestamps(t *testing.T) {
return true return true
}, 30*time.Second, 1*time.Second, "Message propagation failed") }, 30*time.Second, 1*time.Second, "Message propagation failed")
assert.Equal(t, int32(1), env.chats[0].getLamportTimestamp(), "Sender's Lamport timestamp should be 1") assert.Greater(t, env.chats[0].getLamportTimestamp(), int32(0), "Sender's Lamport timestamp should be greater than 0")
assert.Greater(t, env.chats[1].getLamportTimestamp(), int32(0), "Node 1's Lamport timestamp should be greater than 0") assert.Greater(t, env.chats[1].getLamportTimestamp(), int32(0), "Node 1's Lamport timestamp should be greater than 0")
assert.Greater(t, env.chats[2].getLamportTimestamp(), int32(0), "Node 2's Lamport timestamp should be greater than 0") assert.Greater(t, env.chats[2].getLamportTimestamp(), int32(0), "Node 2's Lamport timestamp should be greater than 0")

View File

@ -5,6 +5,9 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log"
"os"
"strings"
"time" "time"
"github.com/bits-and-blooms/bloom/v3" "github.com/bits-and-blooms/bloom/v3"
@ -18,15 +21,17 @@ const (
bufferSweepInterval = 5 * time.Second bufferSweepInterval = 5 * time.Second
syncMessageInterval = 30 * time.Second syncMessageInterval = 30 * time.Second
messageAckTimeout = 60 * time.Second messageAckTimeout = 60 * time.Second
maxRetries = 3 maxRetries = 5
retryBaseDelay = 1 * time.Second retryBaseDelay = 3 * time.Second
maxRetryDelay = 10 * time.Second maxRetryDelay = 30 * time.Second
ackTimeout = 5 * time.Second ackTimeout = 5 * time.Second
maxResendAttempts = 5 maxResendAttempts = 5
resendBaseDelay = 1 * time.Second resendBaseDelay = 1 * time.Second
maxResendDelay = 30 * time.Second maxResendDelay = 30 * time.Second
) )
var reliabilityLogger *log.Logger
func (c *Chat) initReliabilityProtocol() { func (c *Chat) initReliabilityProtocol() {
c.wg.Add(4) c.wg.Add(4)
c.setupMessageRequestHandler() c.setupMessageRequestHandler()
@ -37,6 +42,18 @@ func (c *Chat) initReliabilityProtocol() {
go c.startEagerPushMechanism() go c.startEagerPushMechanism()
} }
func init() {
file, err := os.OpenFile("reliability.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
reliabilityLogger = log.New(file, "", log.LstdFlags)
}
func (c *Chat) logReliabilityEvent(message string) {
reliabilityLogger.Println(message)
}
func (c *Chat) startEagerPushMechanism() { func (c *Chat) startEagerPushMechanism() {
defer c.wg.Done() defer c.wg.Done()
@ -140,6 +157,7 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) {
c.ui.ChatMessage(int64(c.getLamportTimestamp()), msg.SenderId, msg.Content) c.ui.ChatMessage(int64(c.getLamportTimestamp()), msg.SenderId, msg.Content)
// Add to message history // Add to message history
c.addToMessageHistory(msg) c.addToMessageHistory(msg)
c.logReliabilityEvent(fmt.Sprintf("Processed message %s with Lamport timestamp %d", msg.MessageId, msg.LamportTimestamp))
} }
// Process any messages in the buffer that now have their dependencies met // Process any messages in the buffer that now have their dependencies met
@ -151,6 +169,7 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) {
} }
// Add to incoming buffer // Add to incoming buffer
c.addToIncomingBuffer(msg) c.addToIncomingBuffer(msg)
c.logReliabilityEvent(fmt.Sprintf("Message %s buffered due to missing dependencies: %v", msg.MessageId, missingDeps))
} }
} }
@ -218,6 +237,7 @@ func (c *Chat) requestMissingMessage(messageID string) {
missedMsg, err := c.doRequestMissingMessageFromPeers(messageID) missedMsg, err := c.doRequestMissingMessageFromPeers(messageID)
if err == nil { if err == nil {
c.processReceivedMessage(missedMsg) c.processReceivedMessage(missedMsg)
c.logReliabilityEvent(fmt.Sprintf("Successfully retrieved missing message %s", messageID))
return return
} }
@ -229,7 +249,7 @@ func (c *Chat) requestMissingMessage(messageID string) {
time.Sleep(delay) time.Sleep(delay)
} }
c.ui.ErrorMessage(fmt.Errorf("failed to retrieve missing message %s after %d attempts", messageID, maxRetries)) c.logReliabilityEvent(fmt.Sprintf("Failed to retrieve missing message %s after %d attempts", messageID, maxRetries))
} }
func (c *Chat) checkCausalDependencies(msg *pb.Message) []string { func (c *Chat) checkCausalDependencies(msg *pb.Message) []string {
@ -279,6 +299,21 @@ func (c *Chat) addToMessageHistory(msg *pb.Message) {
if len(c.messageHistory) > maxMessageHistory { if len(c.messageHistory) > maxMessageHistory {
c.messageHistory = c.messageHistory[len(c.messageHistory)-maxMessageHistory:] c.messageHistory = c.messageHistory[len(c.messageHistory)-maxMessageHistory:]
} }
c.logReliabilityEvent(fmt.Sprintf("Added message %s to history at position %d with Lamport timestamp %d", msg.MessageId, insertIndex, msg.LamportTimestamp))
// Log the entire message history
c.logMessageHistory()
}
func (c *Chat) logMessageHistory() {
var historyLog strings.Builder
historyLog.WriteString("Current Message History:\n")
for i, msg := range c.messageHistory {
historyLog.WriteString(fmt.Sprintf("%d. MessageID: %s, Sender: %s, Lamport: %d, Content: %s\n",
i+1, msg.MessageId, msg.SenderId, msg.LamportTimestamp, msg.Content))
}
c.logReliabilityEvent(historyLog.String())
} }
func (c *Chat) periodicBufferSweep() { func (c *Chat) periodicBufferSweep() {
@ -395,18 +430,18 @@ func (c *Chat) addToIncomingBuffer(msg *pb.Message) {
c.incomingBuffer = append(c.incomingBuffer, msg) c.incomingBuffer = append(c.incomingBuffer, msg)
} }
func (c *Chat) incLamportTimestamp() { func (c *Chat) incLamportTimestamp() int32 {
c.lamportTSMutex.Lock() c.lamportTSMutex.Lock()
defer c.lamportTSMutex.Unlock() defer c.lamportTSMutex.Unlock()
c.lamportTimestamp++ now := int32(time.Now().Unix())
c.lamportTimestamp = max32(now, c.lamportTimestamp+1)
return c.lamportTimestamp
} }
func (c *Chat) updateLamportTimestamp(msgTs int32) { func (c *Chat) updateLamportTimestamp(msgTs int32) {
c.lamportTSMutex.Lock() c.lamportTSMutex.Lock()
defer c.lamportTSMutex.Unlock() defer c.lamportTSMutex.Unlock()
if msgTs > c.lamportTimestamp { c.lamportTimestamp = max32(msgTs, c.lamportTimestamp)
c.lamportTimestamp = msgTs
}
} }
func (c *Chat) getLamportTimestamp() int32 { func (c *Chat) getLamportTimestamp() int32 {
@ -414,3 +449,10 @@ func (c *Chat) getLamportTimestamp() int32 {
defer c.lamportTSMutex.Unlock() defer c.lamportTSMutex.Unlock()
return c.lamportTimestamp return c.lamportTimestamp
} }
func max32(a, b int32) int32 {
if a > b {
return a
}
return b
}