diff --git a/go.mod b/go.mod index 904f0553..b197eec7 100644 --- a/go.mod +++ b/go.mod @@ -31,5 +31,6 @@ require ( github.com/urfave/cli/v2 v2.4.0 go.opencensus.io v0.23.0 go.uber.org/zap v1.21.0 + golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect google.golang.org/protobuf v1.28.0 // indirect ) diff --git a/go.sum b/go.sum index 93863047..8204ea92 100644 --- a/go.sum +++ b/go.sum @@ -1224,6 +1224,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index fd1ca234..095efc1c 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -19,6 +19,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -187,9 +188,11 @@ func (wf *WakuFilter) FilterListener() { handle := func(envelope *protocol.Envelope) error { // async msg := envelope.Message() topic := envelope.PubsubTopic() + g, _ := errgroup.WithContext(context.Background()) // Each subscriber is a light node that earlier on invoked // a FilterRequest on this node for subscriber := range wf.subscribers.Items() { + subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { wf.log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) continue @@ -200,15 +203,20 @@ func (wf *WakuFilter) FilterListener() { wf.log.Info("found matching contentTopic ", filter, msg) // Do a message push to light node wf.log.Info("pushing messages to light node: ", subscriber.peer) - if err := wf.pushMessage(subscriber, msg); err != nil { + g.Go(func() (err error) { + err = wf.pushMessage(subscriber, msg) + if err != nil { + wf.log.Error(err) + } return err - } - + }) + // Break if we have found a match + break } } } - return nil + return g.Wait() } for m := range wf.MsgC {