fix: simplify code by using mutex instead of channels

This commit is contained in:
Richard Ramos 2023-02-08 19:49:15 -04:00 committed by RichΛrd
parent c5faf8b9e9
commit dd9261c981
2 changed files with 32 additions and 58 deletions

View File

@ -157,6 +157,7 @@ func Test500(t *testing.T) {
if err := wakuNode2.Publish(ctx, msg); err != nil { if err := wakuNode2.Publish(ctx, msg); err != nil {
require.Fail(t, "Could not publish all messages") require.Fail(t, "Could not publish all messages")
} }
time.Sleep(5 * time.Millisecond)
} }
}() }()

View File

@ -30,18 +30,6 @@ const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
type cacheItem struct {
msg *pb.WakuMessage
id string
pubsubTopic string
}
type msgReq struct {
id string
pubsubTopic string
ch chan *pb.WakuMessage
}
type WakuRelay struct { type WakuRelay struct {
host host.Host host host.Host
opts []pubsub.Option opts []pubsub.Option
@ -63,10 +51,11 @@ type WakuRelay struct {
subscriptions map[string][]*Subscription subscriptions map[string][]*Subscription
subscriptionsMutex sync.Mutex subscriptionsMutex sync.Mutex
// TODO: convert to concurrent map with gc
msgCacheMutex sync.Mutex
msgCacheDeletions int
msgCache map[string]*pb.WakuMessage msgCache map[string]*pb.WakuMessage
msgCacheCh chan cacheItem shrinkCacheReq chan struct{}
deleteCacheCh chan string
getMsgCh chan msgReq
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -119,10 +108,8 @@ func (w *WakuRelay) Start(ctx context.Context) error {
w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context
w.cancel = cancel w.cancel = cancel
w.msgCacheCh = make(chan cacheItem) w.shrinkCacheReq = make(chan struct{})
w.deleteCacheCh = make(chan string)
w.msgCache = make(map[string]*pb.WakuMessage, 1000) w.msgCache = make(map[string]*pb.WakuMessage, 1000)
w.getMsgCh = make(chan msgReq)
w.wg.Add(1) w.wg.Add(1)
go w.cacheWorker(ctx) go w.cacheWorker(ctx)
@ -138,58 +125,46 @@ func (w *WakuRelay) Start(ctx context.Context) error {
} }
func (w *WakuRelay) getMessageFromCache(topic string, id string) *pb.WakuMessage { func (w *WakuRelay) getMessageFromCache(topic string, id string) *pb.WakuMessage {
resultCh := make(chan *pb.WakuMessage, 1) w.msgCacheMutex.Lock()
defer close(resultCh)
w.getMsgCh <- msgReq{ key := topic + id
id: id, msg := w.msgCache[key]
pubsubTopic: topic, delete(w.msgCache, key)
ch: resultCh, w.msgCacheDeletions++
deleteCnt := w.msgCacheDeletions
w.msgCacheMutex.Unlock()
if deleteCnt > 1000 {
w.shrinkCacheReq <- struct{}{}
} }
result := <-resultCh return msg
w.deleteCacheCh <- topic + id
return result
} }
func (w *WakuRelay) AddToCache(pubsubTopic string, id string, msg *pb.WakuMessage) { func (w *WakuRelay) AddToCache(topic string, id string, msg *pb.WakuMessage) {
w.msgCacheCh <- cacheItem{ w.msgCacheMutex.Lock()
msg: msg, defer w.msgCacheMutex.Unlock()
id: id,
pubsubTopic: pubsubTopic, key := topic + id
} w.msgCache[key] = msg
} }
func (w *WakuRelay) cacheWorker(ctx context.Context) { func (w *WakuRelay) cacheWorker(ctx context.Context) {
defer w.wg.Done() defer w.wg.Done()
deleteCounter := 0
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case item := <-w.msgCacheCh: case <-w.shrinkCacheReq:
key := item.pubsubTopic + item.id w.msgCacheMutex.Lock()
_, ok := w.msgCache[key]
if !ok {
w.msgCache[item.pubsubTopic+item.id] = item.msg
}
case req := <-w.getMsgCh:
key := req.pubsubTopic + req.id
req.ch <- w.msgCache[key]
case key := <-w.deleteCacheCh:
delete(w.msgCache, key)
deleteCounter++
// Shrink msg cache to avoid oom // Shrink msg cache to avoid oom
if deleteCounter > 1000 {
newMsgCache := make(map[string]*pb.WakuMessage, 1000) newMsgCache := make(map[string]*pb.WakuMessage, 1000)
for k, v := range w.msgCache { for k, v := range w.msgCache {
newMsgCache[k] = v newMsgCache[k] = v
} }
w.msgCache = newMsgCache w.msgCache = newMsgCache
} w.msgCacheMutex.Unlock()
} }
} }
} }
@ -325,9 +300,7 @@ func (w *WakuRelay) Stop() {
w.cancel() w.cancel()
w.wg.Wait() w.wg.Wait()
close(w.msgCacheCh) close(w.shrinkCacheReq)
close(w.deleteCacheCh)
close(w.getMsgCh)
w.msgCache = nil w.msgCache = nil