fix: race condition deleting items from cache

This commit is contained in:
Richard Ramos 2023-02-08 15:20:42 -04:00 committed by RichΛrd
parent 144dfa5b7b
commit c5faf8b9e9
2 changed files with 27 additions and 11 deletions

View File

@ -59,8 +59,8 @@ func int2Bytes(i int) []byte {
return append(big.NewInt(int64(i)).Bytes(), byte(0))
}
func Test5000(t *testing.T) {
maxMsgs := 5000
func Test500(t *testing.T) {
maxMsgs := 500
maxMsgBytes := int2Bytes(maxMsgs)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

View File

@ -68,6 +68,7 @@ type WakuRelay struct {
deleteCacheCh chan string
getMsgCh chan msgReq
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
@ -115,12 +116,13 @@ func NewWakuRelay(h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, ti
func (w *WakuRelay) Start(ctx context.Context) error {
w.wg.Wait()
ctx, cancel := context.WithCancel(ctx)
w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context
w.cancel = cancel
w.msgCacheCh = make(chan cacheItem, 1000)
w.deleteCacheCh = make(chan string, 1000)
w.msgCacheCh = make(chan cacheItem)
w.deleteCacheCh = make(chan string)
w.msgCache = make(map[string]*pb.WakuMessage, 1000)
w.getMsgCh = make(chan msgReq, 1000)
w.getMsgCh = make(chan msgReq)
w.wg.Add(1)
go w.cacheWorker(ctx)
@ -145,7 +147,11 @@ func (w *WakuRelay) getMessageFromCache(topic string, id string) *pb.WakuMessage
ch: resultCh,
}
return <-resultCh
result := <-resultCh
w.deleteCacheCh <- topic + id
return result
}
func (w *WakuRelay) AddToCache(pubsubTopic string, id string, msg *pb.WakuMessage) {
@ -165,11 +171,14 @@ func (w *WakuRelay) cacheWorker(ctx context.Context) {
case <-ctx.Done():
return
case item := <-w.msgCacheCh:
w.msgCache[item.pubsubTopic+item.id] = item.msg
key := item.pubsubTopic + item.id
_, ok := w.msgCache[key]
if !ok {
w.msgCache[item.pubsubTopic+item.id] = item.msg
}
case req := <-w.getMsgCh:
key := req.pubsubTopic + req.id
req.ch <- w.msgCache[key]
w.deleteCacheCh <- key
case key := <-w.deleteCacheCh:
delete(w.msgCache, key)
deleteCounter++
@ -365,6 +374,7 @@ func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscr
w.bcaster.Register(&topic, subscription.C)
}
w.wg.Add(1)
go w.subscribeToTopic(ctx, topic, subscription, sub)
return subscription, nil
@ -428,16 +438,22 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(ctx context.Context, pubsubTopic string, subscription *Subscription, sub *pubsub.Subscription) {
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "relay"))
func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string, subscription *Subscription, sub *pubsub.Subscription) {
defer w.wg.Done()
ctx, err := tag.New(w.ctx, tag.Insert(metrics.KeyType, "relay"))
if err != nil {
w.log.Error("creating tag map", zap.Error(err))
return
}
subChannel := w.nextMessage(ctx, sub)
subChannel := w.nextMessage(w.ctx, sub)
for {
select {
case <-userCtx.Done():
return
case <-ctx.Done():
return
case <-subscription.quit:
func(topic string) {
subscription.Lock()