Fix race condition with PNs
Sometimes the message scheduled & message sent notifications are received out of order. Now we use a single channel for both so order is maintained. This was causing the tests to be flaky and it might have happened in production as well.
This commit is contained in:
parent
4d3c04e41c
commit
4e33e46795
|
@ -43,6 +43,19 @@ type SentMessage struct {
|
|||
MessageIDs [][]byte
|
||||
}
|
||||
|
||||
type MessageEventType uint32
|
||||
|
||||
const (
|
||||
MessageScheduled = iota + 1
|
||||
MessageSent
|
||||
)
|
||||
|
||||
type MessageEvent struct {
|
||||
Type MessageEventType
|
||||
SentMessage *SentMessage
|
||||
RawMessage *RawMessage
|
||||
}
|
||||
|
||||
type MessageSender struct {
|
||||
identity *ecdsa.PrivateKey
|
||||
datasync *datasync.DataSync
|
||||
|
@ -56,10 +69,8 @@ type MessageSender struct {
|
|||
ephemeralKeys map[string]*ecdsa.PrivateKey
|
||||
ephemeralKeysMutex sync.Mutex
|
||||
|
||||
// sentMessagesSubscriptions contains all the subscriptions for sent messages
|
||||
sentMessagesSubscriptions []chan<- *SentMessage
|
||||
// sentMessagesSubscriptions contains all the subscriptions for scheduled messages
|
||||
scheduledMessagesSubscriptions []chan<- *RawMessage
|
||||
// messageEventsSubscriptions contains all the subscriptions for message events
|
||||
messageEventsSubscriptions []chan<- *MessageEvent
|
||||
|
||||
featureFlags FeatureFlags
|
||||
|
||||
|
@ -117,10 +128,10 @@ func NewMessageSender(
|
|||
}
|
||||
|
||||
func (s *MessageSender) Stop() {
|
||||
for _, c := range s.sentMessagesSubscriptions {
|
||||
for _, c := range s.messageEventsSubscriptions {
|
||||
close(c)
|
||||
}
|
||||
s.sentMessagesSubscriptions = nil
|
||||
s.messageEventsSubscriptions = nil
|
||||
s.datasync.Stop() // idempotent op
|
||||
}
|
||||
|
||||
|
@ -734,39 +745,40 @@ func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.Pu
|
|||
return hash, newMessage, nil
|
||||
}
|
||||
|
||||
// SubscribeToSentMessages returns a channel where we publish every time a message is sent
|
||||
func (s *MessageSender) SubscribeToSentMessages() <-chan *SentMessage {
|
||||
c := make(chan *SentMessage, 100)
|
||||
s.sentMessagesSubscriptions = append(s.sentMessagesSubscriptions, c)
|
||||
func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent {
|
||||
c := make(chan *MessageEvent, 100)
|
||||
s.messageEventsSubscriptions = append(s.messageEventsSubscriptions, c)
|
||||
return c
|
||||
}
|
||||
|
||||
func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) {
|
||||
event := &MessageEvent{
|
||||
Type: MessageSent,
|
||||
SentMessage: sentMessage,
|
||||
}
|
||||
// Publish on channels, drop if buffer is full
|
||||
for _, c := range s.sentMessagesSubscriptions {
|
||||
for _, c := range s.messageEventsSubscriptions {
|
||||
select {
|
||||
case c <- sentMessage:
|
||||
case c <- event:
|
||||
default:
|
||||
s.logger.Warn("sent messages subscription channel full, dropping message")
|
||||
s.logger.Warn("message events subscription channel full when publishing sent event, dropping message")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// SubscribeToScheduledMessages returns a channel where we publish every time a message is scheduled for sending
|
||||
func (s *MessageSender) SubscribeToScheduledMessages() <-chan *RawMessage {
|
||||
c := make(chan *RawMessage, 100)
|
||||
s.scheduledMessagesSubscriptions = append(s.scheduledMessagesSubscriptions, c)
|
||||
return c
|
||||
}
|
||||
|
||||
func (s *MessageSender) notifyOnScheduledMessage(message *RawMessage) {
|
||||
event := &MessageEvent{
|
||||
Type: MessageScheduled,
|
||||
RawMessage: message,
|
||||
}
|
||||
|
||||
// Publish on channels, drop if buffer is full
|
||||
for _, c := range s.scheduledMessagesSubscriptions {
|
||||
for _, c := range s.messageEventsSubscriptions {
|
||||
select {
|
||||
case c <- message:
|
||||
case c <- event:
|
||||
default:
|
||||
s.logger.Warn("scheduled messages subscription channel full, dropping message")
|
||||
s.logger.Warn("message events subscription channel full when publishing scheduled event, dropping message")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -699,38 +699,32 @@ func (c *Client) generateSharedKey(publicKey *ecdsa.PublicKey) ([]byte, error) {
|
|||
func (c *Client) subscribeForMessageEvents() {
|
||||
go func() {
|
||||
c.config.Logger.Debug("subscribing for message events")
|
||||
sentMessagesSubscription := c.messageSender.SubscribeToSentMessages()
|
||||
scheduledMessagesSubscription := c.messageSender.SubscribeToScheduledMessages()
|
||||
messageEventsSubscription := c.messageSender.SubscribeToMessageEvents()
|
||||
for {
|
||||
select {
|
||||
// order is important, since both are asynchronous, we want to process
|
||||
// first scheduled messages, and after sent messages, otherwise we might
|
||||
// have some race conditions.
|
||||
// This does not completely rules them out, but reduced the window
|
||||
// where it might happen, a single channel should be used
|
||||
// if this actually happens.
|
||||
case m, more := <-scheduledMessagesSubscription:
|
||||
case m, more := <-messageEventsSubscription:
|
||||
if !more {
|
||||
c.config.Logger.Debug("no more scheduled messages, quitting")
|
||||
c.config.Logger.Debug("no more message events, quitting")
|
||||
return
|
||||
}
|
||||
c.config.Logger.Debug("handling message scheduled")
|
||||
if err := c.handleMessageScheduled(m); err != nil {
|
||||
c.config.Logger.Error("failed to handle message", zap.Error(err))
|
||||
}
|
||||
|
||||
case m, more := <-sentMessagesSubscription:
|
||||
if !more {
|
||||
c.config.Logger.Debug("no more sent messages, quitting")
|
||||
return
|
||||
}
|
||||
c.config.Logger.Debug("handling message sent")
|
||||
if err := c.handleMessageSent(m); err != nil {
|
||||
c.config.Logger.Error("failed to handle message", zap.Error(err))
|
||||
switch m.Type {
|
||||
case common.MessageScheduled:
|
||||
c.config.Logger.Debug("handling message scheduled")
|
||||
if err := c.handleMessageScheduled(m.RawMessage); err != nil {
|
||||
c.config.Logger.Error("failed to handle message", zap.Error(err))
|
||||
}
|
||||
case common.MessageSent:
|
||||
c.config.Logger.Debug("handling message sent")
|
||||
if err := c.handleMessageSent(m.SentMessage); err != nil {
|
||||
c.config.Logger.Error("failed to handle message", zap.Error(err))
|
||||
}
|
||||
default:
|
||||
c.config.Logger.Warn("message event type not supported")
|
||||
}
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue