From c5faf8b9e94fe9b8b8848b00f4b372a14206974f Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 8 Feb 2023 15:20:42 -0400 Subject: [PATCH] fix: race condition deleting items from cache --- waku/v2/node/wakunode2_test.go | 4 ++-- waku/v2/protocol/relay/waku_relay.go | 34 ++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 117f5d3a..56c09c03 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -59,8 +59,8 @@ func int2Bytes(i int) []byte { return append(big.NewInt(int64(i)).Bytes(), byte(0)) } -func Test5000(t *testing.T) { - maxMsgs := 5000 +func Test500(t *testing.T) { + maxMsgs := 500 maxMsgBytes := int2Bytes(maxMsgs) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index edb75e49..424f40d9 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -68,6 +68,7 @@ type WakuRelay struct { deleteCacheCh chan string getMsgCh chan msgReq + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } @@ -115,12 +116,13 @@ func NewWakuRelay(h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, ti func (w *WakuRelay) Start(ctx context.Context) error { w.wg.Wait() ctx, cancel := context.WithCancel(ctx) + w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context w.cancel = cancel - w.msgCacheCh = make(chan cacheItem, 1000) - w.deleteCacheCh = make(chan string, 1000) + w.msgCacheCh = make(chan cacheItem) + w.deleteCacheCh = make(chan string) w.msgCache = make(map[string]*pb.WakuMessage, 1000) - w.getMsgCh = make(chan msgReq, 1000) + w.getMsgCh = make(chan msgReq) w.wg.Add(1) go w.cacheWorker(ctx) @@ -145,7 +147,11 @@ func (w *WakuRelay) getMessageFromCache(topic string, id string) *pb.WakuMessage ch: resultCh, } - return <-resultCh + result := <-resultCh + + w.deleteCacheCh <- topic + id + + return result } func (w *WakuRelay) AddToCache(pubsubTopic string, id string, msg *pb.WakuMessage) { @@ -165,11 +171,14 @@ func (w *WakuRelay) cacheWorker(ctx context.Context) { case <-ctx.Done(): return case item := <-w.msgCacheCh: - w.msgCache[item.pubsubTopic+item.id] = item.msg + key := item.pubsubTopic + item.id + _, ok := w.msgCache[key] + if !ok { + w.msgCache[item.pubsubTopic+item.id] = item.msg + } case req := <-w.getMsgCh: key := req.pubsubTopic + req.id req.ch <- w.msgCache[key] - w.deleteCacheCh <- key case key := <-w.deleteCacheCh: delete(w.msgCache, key) deleteCounter++ @@ -365,6 +374,7 @@ func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscr w.bcaster.Register(&topic, subscription.C) } + w.wg.Add(1) go w.subscribeToTopic(ctx, topic, subscription, sub) return subscription, nil @@ -428,16 +438,22 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < return msgChannel } -func (w *WakuRelay) subscribeToTopic(ctx context.Context, pubsubTopic string, subscription *Subscription, sub *pubsub.Subscription) { - ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "relay")) +func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string, subscription *Subscription, sub *pubsub.Subscription) { + defer w.wg.Done() + + ctx, err := tag.New(w.ctx, tag.Insert(metrics.KeyType, "relay")) if err != nil { w.log.Error("creating tag map", zap.Error(err)) return } - subChannel := w.nextMessage(ctx, sub) + subChannel := w.nextMessage(w.ctx, sub) for { select { + case <-userCtx.Done(): + return + case <-ctx.Done(): + return case <-subscription.quit: func(topic string) { subscription.Lock()