fix: filter deadlock (#262)

This commit is contained in:
Martin Kobetic 2022-06-14 11:53:56 -04:00 committed by GitHub
parent 25c3887342
commit 2c2725308f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 28 deletions

View File

@ -14,6 +14,15 @@ type Subscriber struct {
filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN?
}
func (sub Subscriber) HasContentTopic(topic string) bool {
for _, filter := range sub.filter.ContentFilters {
if filter.ContentTopic == topic {
return true
}
}
return false
}
type Subscribers struct {
sync.RWMutex
subscribers []Subscriber
@ -36,25 +45,16 @@ func (sub *Subscribers) Append(s Subscriber) int {
return len(sub.subscribers)
}
func (subs *Subscribers) SubscriberHasContentTopic(sub Subscriber, topic string) bool {
subs.RLock()
defer subs.RUnlock()
for _, filter := range sub.filter.ContentFilters {
if filter.ContentTopic == topic {
return true
}
}
return false
}
func (sub *Subscribers) Items() <-chan Subscriber {
func (sub *Subscribers) Items(topic *string) <-chan Subscriber {
c := make(chan Subscriber)
f := func() {
sub.RLock()
defer sub.RUnlock()
for _, value := range sub.subscribers {
c <- value
for _, s := range sub.subscribers {
if topic == nil || s.HasContentTopic(*topic) {
c <- s
}
}
close(c)
}

View File

@ -189,7 +189,7 @@ func (wf *WakuFilter) FilterListener() {
g := new(errgroup.Group)
// Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node
for subscriber := range wf.subscribers.Items() {
for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) {
logger := logger.With(logging.HostID("subscriber", subscriber.peer))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic {
@ -199,18 +199,15 @@ func (wf *WakuFilter) FilterListener() {
continue
}
if wf.subscribers.SubscriberHasContentTopic(subscriber, msg.ContentTopic) {
logger.Info("found matching content topic", zap.String("contentTopic", msg.ContentTopic))
// Do a message push to light node
logger.Info("pushing message to light node")
g.Go(func() (err error) {
err = wf.pushMessage(subscriber, msg)
if err != nil {
logger.Error("pushing message", zap.Error(err))
}
return err
})
}
// Do a message push to light node
logger.Info("pushing message to light node", zap.String("contentTopic", msg.ContentTopic))
g.Go(func() (err error) {
err = wf.pushMessage(subscriber, msg)
if err != nil {
logger.Error("pushing message", zap.Error(err))
}
return err
})
}
return g.Wait()

View File

@ -215,7 +215,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
time.Sleep(1 * time.Second)
require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) // Failed peer has been removed
for subscriber := range node2Filter.subscribers.Items() {
for subscriber := range node2Filter.subscribers.Items(nil) {
if subscriber.peer == node1.h.ID() {
require.Fail(t, "Subscriber should not exist")
}