From dd9261c981bf86c5be87590e1bc0e82a7d6ed763 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 8 Feb 2023 19:49:15 -0400 Subject: [PATCH] fix: simplify code by using mutex instead of channels --- waku/v2/node/wakunode2_test.go | 1 + waku/v2/protocol/relay/waku_relay.go | 89 ++++++++++------------------ 2 files changed, 32 insertions(+), 58 deletions(-) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 56c09c03..ef657ee6 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -157,6 +157,7 @@ func Test500(t *testing.T) { if err := wakuNode2.Publish(ctx, msg); err != nil { require.Fail(t, "Could not publish all messages") } + time.Sleep(5 * time.Millisecond) } }() diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 424f40d9..47bd2d49 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -30,18 +30,6 @@ const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() -type cacheItem struct { - msg *pb.WakuMessage - id string - pubsubTopic string -} - -type msgReq struct { - id string - pubsubTopic string - ch chan *pb.WakuMessage -} - type WakuRelay struct { host host.Host opts []pubsub.Option @@ -63,10 +51,11 @@ type WakuRelay struct { subscriptions map[string][]*Subscription subscriptionsMutex sync.Mutex - msgCache map[string]*pb.WakuMessage - msgCacheCh chan cacheItem - deleteCacheCh chan string - getMsgCh chan msgReq + // TODO: convert to concurrent map with gc + msgCacheMutex sync.Mutex + msgCacheDeletions int + msgCache map[string]*pb.WakuMessage + shrinkCacheReq chan struct{} ctx context.Context cancel context.CancelFunc @@ -119,10 +108,8 @@ func (w *WakuRelay) Start(ctx context.Context) error { w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context w.cancel = cancel - w.msgCacheCh = make(chan cacheItem) - w.deleteCacheCh = make(chan string) + w.shrinkCacheReq = make(chan struct{}) w.msgCache = make(map[string]*pb.WakuMessage, 1000) - w.getMsgCh = make(chan msgReq) w.wg.Add(1) go w.cacheWorker(ctx) @@ -138,58 +125,46 @@ func (w *WakuRelay) Start(ctx context.Context) error { } func (w *WakuRelay) getMessageFromCache(topic string, id string) *pb.WakuMessage { - resultCh := make(chan *pb.WakuMessage, 1) - defer close(resultCh) + w.msgCacheMutex.Lock() - w.getMsgCh <- msgReq{ - id: id, - pubsubTopic: topic, - ch: resultCh, + key := topic + id + msg := w.msgCache[key] + delete(w.msgCache, key) + w.msgCacheDeletions++ + deleteCnt := w.msgCacheDeletions + w.msgCacheMutex.Unlock() + + if deleteCnt > 1000 { + w.shrinkCacheReq <- struct{}{} } - result := <-resultCh - - w.deleteCacheCh <- topic + id - - return result + return msg } -func (w *WakuRelay) AddToCache(pubsubTopic string, id string, msg *pb.WakuMessage) { - w.msgCacheCh <- cacheItem{ - msg: msg, - id: id, - pubsubTopic: pubsubTopic, - } +func (w *WakuRelay) AddToCache(topic string, id string, msg *pb.WakuMessage) { + w.msgCacheMutex.Lock() + defer w.msgCacheMutex.Unlock() + + key := topic + id + w.msgCache[key] = msg } func (w *WakuRelay) cacheWorker(ctx context.Context) { defer w.wg.Done() - deleteCounter := 0 for { select { case <-ctx.Done(): return - case item := <-w.msgCacheCh: - key := item.pubsubTopic + item.id - _, ok := w.msgCache[key] - if !ok { - w.msgCache[item.pubsubTopic+item.id] = item.msg - } - case req := <-w.getMsgCh: - key := req.pubsubTopic + req.id - req.ch <- w.msgCache[key] - case key := <-w.deleteCacheCh: - delete(w.msgCache, key) - deleteCounter++ + case <-w.shrinkCacheReq: + w.msgCacheMutex.Lock() // Shrink msg cache to avoid oom - if deleteCounter > 1000 { - newMsgCache := make(map[string]*pb.WakuMessage, 1000) - for k, v := range w.msgCache { - newMsgCache[k] = v - } - w.msgCache = newMsgCache + newMsgCache := make(map[string]*pb.WakuMessage, 1000) + for k, v := range w.msgCache { + newMsgCache[k] = v } + w.msgCache = newMsgCache + w.msgCacheMutex.Unlock() } } } @@ -325,9 +300,7 @@ func (w *WakuRelay) Stop() { w.cancel() w.wg.Wait() - close(w.msgCacheCh) - close(w.deleteCacheCh) - close(w.getMsgCh) + close(w.shrinkCacheReq) w.msgCache = nil