fix: topic data race

Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
This commit is contained in:
gfanton 2022-10-17 17:37:28 +02:00
parent 7f8acdc3c4
commit 08f039cee2

View File

@ -80,7 +80,7 @@ func (ps *PubSub) getOrCreateTopic(ns string) *PubSubSubscribers {
}
func (ps *PubSub) Register(pid peer.ID, ns string, addrs [][]byte, ttlAsSeconds int, counter uint64) {
subscribers := ps.getOrCreateTopic(ns)
topic := ps.getOrCreateTopic(ns)
dataToSend := &pb.RegistrationRecord{
Id: pid.String(),
Addrs: addrs,
@ -88,16 +88,15 @@ func (ps *PubSub) Register(pid peer.ID, ns string, addrs [][]byte, ttlAsSeconds
Ttl: time.Now().Add(time.Duration(ttlAsSeconds) * time.Second).UnixMilli(),
}
subscribers.mu.Lock()
subscribers.lastAnnouncement = dataToSend
toNotify := subscribers.subscribers
subscribers.mu.Unlock()
topic.mu.Lock()
topic.lastAnnouncement = dataToSend
toNotify := topic.subscribers
for _, stream := range toNotify {
if err := stream.WriteMsg(dataToSend); err != nil {
log.Errorf("unable to notify rendezvous data update: %s", err.Error())
}
}
topic.mu.Unlock()
}
func (ps *PubSub) Unregister(p peer.ID, ns string) {
@ -144,13 +143,12 @@ func (ps *PubSub) handleStream(s inet.Stream) {
topic.subscribers[s.Conn().RemotePeer()] = w
subscribedTopics[req.DiscoverSubscribe.Ns] = struct{}{}
lastAnnouncement := topic.lastAnnouncement
topic.mu.Unlock()
if lastAnnouncement != nil {
if err := w.WriteMsg(lastAnnouncement); err != nil {
log.Errorf("unable to write announcement: %s", err.Error())
}
}
topic.mu.Unlock()
}
}