From 08f039cee29f016b2599092dad51f171fb9b9689 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Mon, 17 Oct 2022 17:37:28 +0200 Subject: [PATCH] fix: topic data race Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- sync_inmem_provider.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sync_inmem_provider.go b/sync_inmem_provider.go index 171cd51..b1ce7c7 100644 --- a/sync_inmem_provider.go +++ b/sync_inmem_provider.go @@ -80,7 +80,7 @@ func (ps *PubSub) getOrCreateTopic(ns string) *PubSubSubscribers { } func (ps *PubSub) Register(pid peer.ID, ns string, addrs [][]byte, ttlAsSeconds int, counter uint64) { - subscribers := ps.getOrCreateTopic(ns) + topic := ps.getOrCreateTopic(ns) dataToSend := &pb.RegistrationRecord{ Id: pid.String(), Addrs: addrs, @@ -88,16 +88,15 @@ func (ps *PubSub) Register(pid peer.ID, ns string, addrs [][]byte, ttlAsSeconds Ttl: time.Now().Add(time.Duration(ttlAsSeconds) * time.Second).UnixMilli(), } - subscribers.mu.Lock() - subscribers.lastAnnouncement = dataToSend - toNotify := subscribers.subscribers - subscribers.mu.Unlock() - + topic.mu.Lock() + topic.lastAnnouncement = dataToSend + toNotify := topic.subscribers for _, stream := range toNotify { if err := stream.WriteMsg(dataToSend); err != nil { log.Errorf("unable to notify rendezvous data update: %s", err.Error()) } } + topic.mu.Unlock() } func (ps *PubSub) Unregister(p peer.ID, ns string) { @@ -144,13 +143,12 @@ func (ps *PubSub) handleStream(s inet.Stream) { topic.subscribers[s.Conn().RemotePeer()] = w subscribedTopics[req.DiscoverSubscribe.Ns] = struct{}{} lastAnnouncement := topic.lastAnnouncement - topic.mu.Unlock() - if lastAnnouncement != nil { if err := w.WriteMsg(lastAnnouncement); err != nil { log.Errorf("unable to write announcement: %s", err.Error()) } } + topic.mu.Unlock() } }